Spaces:
Running
Running
| """ | |
| chunker.py | |
| ========== | |
| Phase 3 β Document Chunking | |
| Converts the structured output from pdf_processor.py into chunks ready for | |
| embedding and storage in ChromaDB. | |
| Why not LangChain SemanticChunker? | |
| ----------------------------------- | |
| LangChain's SemanticChunker treats every document as a flat string. It has | |
| no concept of headings, section hierarchy, or whether a block of text is a | |
| table or a paragraph. Docling's HybridChunker, on the other hand, works on | |
| the native DoclingDocument object and understands the full visual structure | |
| parsed by the DocLayNet layout model. | |
| Why not HybridChunker for tables? | |
| ----------------------------------- | |
| HybridChunker splits tables to fit within the token limit. For financial | |
| documents, a holdings table with 30 rows becomes 5 fragments β each fragment | |
| loses the column-header context. An LLM reading "Ticker = GS. Sector = | |
| Financial Services." with no table headers cannot reconstruct the original | |
| table structure reliably. | |
| Solution: best of both worlds | |
| ------------------------------- | |
| TEXT β Docling HybridChunker (tokenizer-aware, structure-aware, | |
| full heading-path injection, respects document hierarchy) | |
| TABLES β Atomic markdown pass-through (one complete table = one chunk, | |
| markdown preserves column/row alignment, never split) | |
| This gives semantic coherence for text AND lossless fidelity for tables. | |
| Output format per chunk | |
| ------------------------ | |
| { | |
| "chunk_id" : "ptc01302411420_text_0042", | |
| "doc_id" : "ptc01302411420", | |
| "chunk_type" : "text" | "table", | |
| "text" : "...", # prose or full markdown table | |
| "is_atomic" : false | true, | |
| "metadata" : { | |
| "source" : "morningstar" | "sec_edgar", | |
| "doc_type" : "research_report" | "10-K" | "10-Q" | "8-K", | |
| "file_name" : "ptc01302411420.pdf", | |
| "page_num" : 5, | |
| "section_title" : "Financial Summary", # immediate heading | |
| "heading_path" : "Results > Financial Summary", # full hierarchy | |
| ... # all keys from document metadata | |
| } | |
| } | |
| Usage (as a module) | |
| ------------------- | |
| from src.chunker import DocumentChunker | |
| chunker = DocumentChunker() | |
| chunks = chunker.chunk_document("data/processed/morningstar/ptc01302411420.json") | |
| Usage (as a script) | |
| ------------------- | |
| python src/chunker.py | |
| python src/chunker.py --force # re-chunk even if output exists | |
| """ | |
| import json | |
| import logging | |
| from pathlib import Path | |
| from datetime import datetime, timezone | |
| # ββ Logging ββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| logging.basicConfig( | |
| level = logging.INFO, | |
| format = "%(asctime)s %(levelname)-8s %(message)s" | |
| ) | |
| log = logging.getLogger(__name__) | |
| # ββ Paths ββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| BASE_DIR = Path(__file__).parent.parent | |
| PROCESSED_DIR = BASE_DIR / "data" / "processed" | |
| CHUNKS_DIR = BASE_DIR / "data" / "chunks" | |
| # ββ Constants ββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| EMBEDDING_MODEL = "sentence-transformers/all-MiniLM-L6-v2" | |
| MAX_TOKENS = 256 # all-MiniLM-L6-v2 context window | |
| # ββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| # TEXT CHUNKING β Docling HybridChunker | |
| # ββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| # HybridChunker works on the native DoclingDocument, understanding: | |
| # β’ Section headings and their hierarchy | |
| # β’ Reading order determined by DocLayNet | |
| # β’ Token limits of the target embedding model | |
| # | |
| # For each text chunk it produces: | |
| # chunk.text β the prose content | |
| # chunk.meta.headings β list of heading titles from root to this section | |
| # e.g. ["Results", "Financial Summary", "Revenue"] | |
| # chunk.meta.doc_items β original DocItem objects (for page number lookup) | |
| # | |
| # We SKIP chunks whose doc_items contain a TableItem β tables are handled | |
| # separately with our atomic markdown approach. | |
| # ββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| def chunk_text_with_hybrid( | |
| docling_doc_path : Path, | |
| doc_meta : dict, | |
| removed_pages : set, | |
| ) -> list[dict]: | |
| """ | |
| Use Docling HybridChunker to split text sections of a document. | |
| Tables are skipped here β they are handled by chunk_tables_atomic(). | |
| Args: | |
| docling_doc_path : path to the _docling.json file saved by Phase 2 | |
| doc_meta : document-level metadata dict (from processed JSON) | |
| removed_pages : set of page numbers filtered out by the noise filter | |
| Returns: | |
| list of text chunk dicts (chunk_type="text") | |
| """ | |
| from docling.chunking import HybridChunker | |
| from docling.datamodel.document import DoclingDocument, TableItem | |
| # Load native DoclingDocument | |
| with open(docling_doc_path) as f: | |
| dl_doc = DoclingDocument.model_validate_json(f.read()) | |
| chunker = HybridChunker( | |
| tokenizer = EMBEDDING_MODEL, | |
| max_tokens = MAX_TOKENS, | |
| merge_peers = True, # merge small adjacent chunks sharing same heading | |
| ) | |
| chunks: list[dict] = [] | |
| for chunk in chunker.chunk(dl_doc): | |
| # ββ Skip table chunks βββββββββββββββββββββββββββββββββββββββββββββ | |
| # HybridChunker splits large tables by token count, which destroys | |
| # column-header context. We handle tables separately with full | |
| # markdown representation (one table = one chunk, never split). | |
| if any(isinstance(item, TableItem) for item in chunk.meta.doc_items): | |
| continue | |
| # ββ Get page number from first doc item βββββββββββββββββββββββββββ | |
| page_num = None | |
| for item in chunk.meta.doc_items: | |
| if item.prov: | |
| page_num = item.prov[0].page_no | |
| break | |
| # ββ Skip chunks on noise pages ββββββββββββββββββββββββββββββββββββ | |
| if page_num in removed_pages: | |
| continue | |
| # ββ Skip empty chunks βββββββββββββββββββββββββββββββββββββββββββββ | |
| text = chunk.text.strip() | |
| if not text: | |
| continue | |
| # ββ Build heading metadata ββββββββββββββββββββββββββββββββββββββββ | |
| headings = chunk.meta.headings or [] | |
| section_title = headings[-1] if headings else "" | |
| heading_path = " > ".join(headings) if headings else "" | |
| chunks.append({ | |
| "chunk_type" : "text", | |
| "text" : text, | |
| "is_atomic" : False, | |
| "metadata" : { | |
| **doc_meta, | |
| "section_title": section_title, | |
| "heading_path" : heading_path, | |
| "page_num" : page_num, | |
| }, | |
| }) | |
| return chunks | |
| # ββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| # TABLE CHUNKING β Atomic markdown pass-through | |
| # ββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| # Why markdown and not HybridChunker's key=value format? | |
| # | |
| # HybridChunker serialises a table cell as: | |
| # "Goldman Sachs, Top 10 Holdings.Ticker = GS. Goldman Sachs, Top 10 | |
| # Holdings.Sector = Financial Services." | |
| # | |
| # Our markdown representation: | |
| # | Company | Ticker | Sector | | |
| # |Goldman Sachs | GS | Financial Services | | |
| # | |
| # LLMs are trained on markdown and read it far more accurately than the | |
| # key=value format. A query about "Goldman Sachs sector" against the | |
| # markdown chunk will produce a correct, well-grounded answer. | |
| # ββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| def chunk_tables_atomic(tables: list[dict], doc_meta: dict) -> list[dict]: | |
| """ | |
| Convert extracted tables into atomic chunks using markdown representation. | |
| Each table = exactly one chunk. The `is_atomic` flag instructs downstream | |
| systems to never split this chunk. | |
| Args: | |
| tables : list of table dicts from processed JSON (with 'markdown' key) | |
| doc_meta : document-level metadata dict | |
| Returns: | |
| list of table chunk dicts (chunk_type="table") | |
| """ | |
| chunks: list[dict] = [] | |
| for t in tables: | |
| markdown = t.get("markdown", "").strip() | |
| if not markdown: | |
| continue | |
| chunks.append({ | |
| "chunk_type": "table", | |
| "text" : markdown, | |
| "is_atomic" : True, | |
| "metadata" : { | |
| **doc_meta, | |
| "page_num" : t.get("page_num"), | |
| "table_index" : t.get("index"), | |
| "rows" : t.get("rows"), | |
| "cols" : t.get("cols"), | |
| "col_headers" : t.get("headers", []), | |
| }, | |
| }) | |
| return chunks | |
| # ββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| # CHUNK ID ASSIGNMENT | |
| # ββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| # IDs are deterministic: {doc_stem}_{type}_{zero_padded_index} | |
| # Deterministic IDs allow ChromaDB upserts (add-or-update) without duplicates | |
| # when the chunker is re-run after adding new documents. | |
| # ββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| def _assign_ids(chunks: list[dict], doc_stem: str) -> list[dict]: | |
| """Attach chunk_id and doc_id to every chunk in-place.""" | |
| text_idx = table_idx = 0 | |
| for chunk in chunks: | |
| if chunk["chunk_type"] == "text": | |
| chunk["chunk_id"] = f"{doc_stem}_text_{text_idx:04d}" | |
| text_idx += 1 | |
| else: | |
| chunk["chunk_id"] = f"{doc_stem}_table_{table_idx:03d}" | |
| table_idx += 1 | |
| chunk["doc_id"] = doc_stem | |
| return chunks | |
| # ββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| # MAIN CHUNKER CLASS | |
| # ββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| class DocumentChunker: | |
| """ | |
| End-to-end document chunker. | |
| Loads a processed JSON + its paired _docling.json (from PDFProcessor), | |
| applies HybridChunker to text sections, passes tables through atomically | |
| as markdown, and saves a chunks JSON. | |
| """ | |
| def __init__(self, chunks_dir: Path = CHUNKS_DIR): | |
| self.chunks_dir = Path(chunks_dir) | |
| self.chunks_dir.mkdir(parents=True, exist_ok=True) | |
| def chunk_document( | |
| self, | |
| json_path : str | Path, | |
| force : bool = False, | |
| ) -> list[dict]: | |
| """ | |
| Chunk a single processed document. | |
| Args: | |
| json_path : path to the processed JSON (output of PDFProcessor) | |
| force : if True, re-chunk even if output already exists | |
| Returns: | |
| list of chunk dicts | |
| """ | |
| json_path = Path(json_path).resolve() | |
| # Mirror processed/ directory structure under chunks/ | |
| rel = json_path.relative_to(PROCESSED_DIR.resolve()) | |
| out_path = self.chunks_dir / rel.parent / f"{json_path.stem}_chunks.json" | |
| doc_stem = json_path.stem | |
| if out_path.exists() and not force: | |
| log.info(f"SKIP {json_path.name} (already chunked β {out_path.name})") | |
| with open(out_path) as f: | |
| return json.load(f)["chunks"] | |
| log.info(f"Chunking: {json_path.name}") | |
| # ββ Load processed JSON βββββββββββββββββββββββββββββββββββββββββββ | |
| with open(json_path) as f: | |
| doc = json.load(f) | |
| doc_meta = doc["metadata"] | |
| tables = doc.get("tables", []) | |
| removed_pages = set(doc_meta.get("removed_pages", [])) | |
| # ββ Locate paired _docling.json βββββββββββββββββββββββββββββββββββ | |
| docling_path = json_path.with_name(json_path.stem + "_docling.json") | |
| if not docling_path.exists(): | |
| log.warning( | |
| f" _docling.json not found for {json_path.name}. " | |
| f"Re-run pdf_processor.py with force=True to regenerate it." | |
| ) | |
| return [] | |
| # ββ Text chunks via HybridChunker βββββββββββββββββββββββββββββββββ | |
| text_chunks = chunk_text_with_hybrid(docling_path, doc_meta, removed_pages) | |
| log.info(f" Text chunks : {len(text_chunks)}") | |
| # ββ Table chunks via atomic markdown pass-through βββββββββββββββββ | |
| table_chunks = chunk_tables_atomic(tables, doc_meta) | |
| log.info(f" Table chunks : {len(table_chunks)}") | |
| # ββ Combine, assign IDs, save βββββββββββββββββββββββββββββββββββββ | |
| all_chunks = _assign_ids(text_chunks + table_chunks, doc_stem) | |
| out_path.parent.mkdir(parents=True, exist_ok=True) | |
| with open(out_path, "w") as f: | |
| json.dump({ | |
| "doc_id" : doc_stem, | |
| "source_file" : str(json_path), | |
| "chunked_at" : datetime.now(timezone.utc).isoformat(), | |
| "total_chunks" : len(all_chunks), | |
| "text_chunks" : len(text_chunks), | |
| "table_chunks" : len(table_chunks), | |
| "chunks" : all_chunks, | |
| }, f, indent=2, ensure_ascii=False) | |
| size_kb = out_path.stat().st_size / 1024 | |
| log.info(f" Saved β {out_path.name} ({size_kb:.1f} KB) " | |
| f"[{len(text_chunks)} text + {len(table_chunks)} table = " | |
| f"{len(all_chunks)} total]") | |
| return all_chunks | |
| def chunk_all( | |
| self, | |
| processed_dir : Path = PROCESSED_DIR, | |
| force : bool = False, | |
| ) -> dict: | |
| """Chunk all processed JSON files. Returns {filename: chunk_count}.""" | |
| json_files = sorted(Path(processed_dir).rglob("*.json")) | |
| # Only process main processed JSONs, not the _docling.json files | |
| json_files = [f for f in json_files if not f.stem.endswith("_docling")] | |
| log.info(f"Found {len(json_files)} processed documents") | |
| summary = {} | |
| for jf in json_files: | |
| try: | |
| chunks = self.chunk_document(jf, force=force) | |
| summary[jf.name] = len(chunks) | |
| except Exception as e: | |
| log.error(f" FAILED {jf.name}: {e}") | |
| summary[jf.name] = 0 | |
| return summary | |
| # ββ Entry point ββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| if __name__ == "__main__": | |
| import sys | |
| force = "--force" in sys.argv | |
| log.info("=" * 60) | |
| log.info("Phase 3 β Document Chunker (HybridChunker + atomic tables)") | |
| log.info("=" * 60) | |
| chunker = DocumentChunker() | |
| summary = chunker.chunk_all(force=force) | |
| log.info("\n" + "=" * 60) | |
| log.info("Chunking complete.") | |
| total = 0 | |
| for fname, n in summary.items(): | |
| log.info(f" {fname:50s} {n:>5} chunks") | |
| total += n | |
| log.info(f" {'TOTAL':50s} {total:>5} chunks") | |
| log.info("=" * 60) | |