Spaces:
Build error
Build error
| """ | |
| Generic Pre-Processing Pipeline (GPP) for Document Intelligence | |
| This module handles: | |
| 1. Parsing PDFs via MinerU Python API (OCR/text modes) | |
| 2. Extracting markdown, images, and content_list JSON | |
| 3. Chunking multimodal content (text, tables, images), ensuring tables/images are in single chunks | |
| 4. Parsing markdown tables into JSON 2D structures for dense tables | |
| 5. Narration of tables/images via LLM | |
| 6. Semantic enhancements (deduplication, coreference, metadata summarization) | |
| 7. Embedding computation for in-memory use | |
| Each step is modular to support swapping components (e.g. different parsers or stores). | |
| """ | |
| import os | |
| import json | |
| from typing import List, Dict, Any, Optional | |
| import re | |
| from magic_pdf.data.data_reader_writer import FileBasedDataWriter, FileBasedDataReader | |
| from magic_pdf.data.dataset import PymuDocDataset | |
| from magic_pdf.model.doc_analyze_by_custom_model import doc_analyze | |
| from magic_pdf.config.enums import SupportedPdfParseMethod | |
| from langchain.text_splitter import RecursiveCharacterTextSplitter | |
| from sentence_transformers import SentenceTransformer | |
| from rank_bm25 import BM25Okapi | |
| import numpy as np | |
| import hnswlib | |
| from src.config import EmbeddingConfig | |
| from src.utils import OpenAIEmbedder | |
| # LLM client abstraction | |
| from src.utils import LLMClient, logger | |
| def parse_markdown_table(md: str) -> Optional[Dict[str, Any]]: | |
| """ | |
| Parses a markdown table into a JSON-like dict: | |
| { headers: [...], rows: [[...], ...] } | |
| Handles multi-level headers by nesting lists if needed. | |
| """ | |
| lines = [l for l in md.strip().splitlines() if l.strip().startswith("|")] | |
| if len(lines) < 2: | |
| return None | |
| header_line = lines[0] | |
| sep_line = lines[1] | |
| # Validate separator line | |
| if not re.match(r"^\|?\s*:?-+:?\s*(\|\s*:?-+:?\s*)+\|?", sep_line): | |
| return None | |
| def split_row(line): | |
| parts = [cell.strip() for cell in line.strip().strip("|").split("|")] | |
| return parts | |
| headers = split_row(header_line) | |
| rows = [split_row(r) for r in lines[2:]] | |
| return {"headers": headers, "rows": rows} | |
| class GPPConfig: | |
| """ | |
| Configuration for GPP pipeline. | |
| """ | |
| CHUNK_TOKEN_SIZE = 256 | |
| DEDUP_SIM_THRESHOLD = 0.9 | |
| EXPANSION_SIM_THRESHOLD = 0.85 | |
| COREF_CONTEXT_SIZE = 3 | |
| HNSW_EF_CONSTRUCTION = int(os.getenv("HNSW_EF_CONSTRUCTION", "200")) | |
| HNSW_M = int(os.getenv("HNSW_M", "16")) | |
| HNSW_EF_SEARCH = int(os.getenv("HNSW_EF_SEARCH", "50")) | |
| class GPP: | |
| def __init__(self, config: GPPConfig): | |
| self.config = config | |
| # Embedding models | |
| if EmbeddingConfig.PROVIDER == "openai": | |
| self.text_embedder = OpenAIEmbedder(EmbeddingConfig.TEXT_MODEL) | |
| self.meta_embedder = OpenAIEmbedder(EmbeddingConfig.META_MODEL) | |
| else: | |
| self.text_embedder = SentenceTransformer( | |
| EmbeddingConfig.TEXT_MODEL, use_auth_token=True | |
| ) | |
| self.meta_embedder = SentenceTransformer( | |
| EmbeddingConfig.META_MODEL, use_auth_token=True | |
| ) | |
| self.bm25 = None | |
| def parse_pdf(self, pdf_path: str, output_dir: str) -> Dict[str, Any]: | |
| """ | |
| Uses MinerU API to parse PDF in OCR/text mode, | |
| dumps markdown, images, layout PDF, content_list JSON. | |
| Returns parsed data plus file paths for UI traceability. | |
| """ | |
| name = os.path.splitext(os.path.basename(pdf_path))[0] | |
| img_dir = os.path.join(output_dir, "images") | |
| os.makedirs(img_dir, exist_ok=True) | |
| os.makedirs(output_dir, exist_ok=True) | |
| writer_imgs = FileBasedDataWriter(img_dir) | |
| writer_md = FileBasedDataWriter(output_dir) | |
| reader = FileBasedDataReader("") | |
| pdf_bytes = reader.read(pdf_path) | |
| ds = PymuDocDataset(pdf_bytes) | |
| if ds.classify() == SupportedPdfParseMethod.OCR: | |
| infer = ds.apply(doc_analyze, ocr=True) | |
| pipe = infer.pipe_ocr_mode(writer_imgs) | |
| else: | |
| infer = ds.apply(doc_analyze, ocr=False) | |
| pipe = infer.pipe_txt_mode(writer_imgs) | |
| # Visual layout | |
| pipe.draw_layout(os.path.join(output_dir, f"{name}_layout.pdf")) | |
| # Dump markdown & JSON | |
| pipe.dump_md(writer_md, f"{name}.md", os.path.basename(img_dir)) | |
| pipe.dump_content_list( | |
| writer_md, f"{name}_content_list.json", os.path.basename(img_dir) | |
| ) | |
| content_list_path = os.path.join(output_dir, f"{name}_content_list.json") | |
| with open(content_list_path, "r", encoding="utf-8") as f: | |
| blocks = json.load(f) | |
| # UI traceability paths | |
| return { | |
| "blocks": blocks, | |
| "md_path": os.path.join(output_dir, f"{name}.md"), | |
| "images_dir": img_dir, | |
| "layout_pdf": os.path.join(output_dir, f"{name}_layout.pdf"), | |
| "spans_pdf": os.path.join(output_dir, f"{name}_spans.pdf"), | |
| } | |
| def chunk_blocks(self, blocks: List[Dict[str, Any]]) -> List[Dict[str, Any]]: | |
| """ | |
| Creates chunks of ~CHUNK_TOKEN_SIZE tokens, but ensures any table/image block | |
| becomes its own chunk (unsplittable), flushing current text chunk as needed. | |
| """ | |
| chunks, current, token_count = [], {"text": "", "type": None, "blocks": []}, 0 | |
| for blk in blocks: | |
| btype = blk.get("type") | |
| text = blk.get("text", "") | |
| if btype in ("table", "img_path"): | |
| # Flush existing text chunk | |
| if current["blocks"]: | |
| chunks.append(current) | |
| current = {"text": "", "type": None, "blocks": []} | |
| token_count = 0 | |
| # Create isolated chunk for the table/image | |
| tbl_chunk = {"text": text, "type": btype, "blocks": [blk]} | |
| # Parse markdown table into JSON structure if applicable | |
| if btype == "table": | |
| tbl_struct = parse_markdown_table(text) | |
| tbl_chunk["table_structure"] = tbl_struct | |
| chunks.append(tbl_chunk) | |
| continue | |
| # Standard text accumulation | |
| count = len(text.split()) | |
| if token_count + count > self.config.CHUNK_TOKEN_SIZE and current["blocks"]: | |
| chunks.append(current) | |
| current = {"text": "", "type": None, "blocks": []} | |
| token_count = 0 | |
| current["text"] += text + "\n" | |
| current["type"] = current["type"] or btype | |
| current["blocks"].append(blk) | |
| token_count += count | |
| # Flush remaining | |
| if current["blocks"]: | |
| chunks.append(current) | |
| logger.info(f"Chunked into {len(chunks)} pieces (with tables/images isolated).") | |
| return chunks | |
| def narrate_multimodal(self, chunks: List[Dict[str, Any]]) -> None: | |
| """ | |
| For table/image chunks, generate LLM narration. Preserve table_structure in metadata. | |
| """ | |
| for c in chunks: | |
| if c["type"] in ("table", "img_path"): | |
| prompt = f"Describe this {c['type']} concisely:\n{c['text']}" | |
| c["narration"] = LLMClient.generate(prompt) | |
| else: | |
| c["narration"] = c["text"] | |
| def deduplicate(self, chunks: List[Dict[str, Any]]) -> List[Dict[str, Any]]: | |
| try: | |
| # embs = self.text_embedder.encode([c.get('narration', '') for c in chunks], convert_to_tensor=True) | |
| narrations = [c.get("narration", "") for c in chunks] | |
| if EmbeddingConfig.PROVIDER == "openai": | |
| embs = self.text_embedder.embed(narrations) | |
| else: | |
| embs = self.text_embedder.encode(narrations) | |
| keep = [] | |
| for i, emb in enumerate(embs): | |
| if not any( | |
| (emb @ embs[j]).item() | |
| / (np.linalg.norm(emb) * np.linalg.norm(embs[j]) + 1e-8) | |
| > self.config.DEDUP_SIM_THRESHOLD | |
| for j in keep | |
| ): | |
| keep.append(i) | |
| deduped = [chunks[i] for i in keep] | |
| logger.info(f"Deduplicated: {len(chunks)}→{len(deduped)}") | |
| return deduped | |
| except Exception as e: | |
| logger.error(f"Deduplication failed: {e}") | |
| return chunks | |
| def coref_resolution(self, chunks: List[Dict[str, Any]]) -> None: | |
| for idx, c in enumerate(chunks): | |
| start = max(0, idx - self.config.COREF_CONTEXT_SIZE) | |
| ctx = "\n".join(chunks[i].get("narration", "") for i in range(start, idx)) | |
| prompt = f"Context:\n{ctx}\nRewrite pronouns in:\n{c.get('narration', '')}" | |
| try: | |
| c["narration"] = LLMClient.generate(prompt) | |
| except Exception as e: | |
| logger.error(f"Coref resolution failed for chunk {idx}: {e}") | |
| def metadata_summarization(self, chunks: List[Dict[str, Any]]) -> None: | |
| sections: Dict[str, List[Dict[str, Any]]] = {} | |
| for c in chunks: | |
| sec = c.get("section", "default") | |
| sections.setdefault(sec, []).append(c) | |
| for sec, items in sections.items(): | |
| blob = "\n".join(i.get("narration", "") for i in items) | |
| try: | |
| summ = LLMClient.generate(f"Summarize this section:\n{blob}") | |
| for i in items: | |
| i.setdefault("metadata", {})["section_summary"] = summ | |
| except Exception as e: | |
| logger.error(f"Metadata summarization failed for section {sec}: {e}") | |
| def build_bm25(self, chunks: List[Dict[str, Any]]) -> None: | |
| """ | |
| Build BM25 index on token lists for sparse retrieval. | |
| """ | |
| tokenized = [c["narration"].split() for c in chunks] | |
| self.bm25 = BM25Okapi(tokenized) | |
| def compute_and_store(self, chunks: List[Dict[str, Any]], output_dir: str) -> None: | |
| """ | |
| 1. Compute embeddings for each chunk's narration (text_vec) | |
| and section_summary (meta_vec). | |
| 2. Build two HNSWlib indices (one for text_vecs, one for meta_vecs). | |
| 3. Save both indices to disk. | |
| 4. Dump human-readable chunk metadata (incl. section_summary) | |
| for traceability in the UI. | |
| """ | |
| # --- 1. Prepare embedder --- | |
| if EmbeddingConfig.PROVIDER.lower() == "openai": | |
| embedder = OpenAIEmbedder(EmbeddingConfig.TEXT_MODEL) | |
| embed_fn = embedder.embed | |
| else: | |
| st_model = SentenceTransformer( | |
| EmbeddingConfig.TEXT_MODEL, use_auth_token=True | |
| ) | |
| embed_fn = lambda texts: st_model.encode( | |
| texts, show_progress_bar=False | |
| ).tolist() | |
| # Batch compute text & meta embeddings --- | |
| narrations = [c["narration"] for c in chunks] | |
| meta_texts = [c.get("section_summary", "") for c in chunks] | |
| logger.info( | |
| "computing_embeddings", | |
| provider=EmbeddingConfig.PROVIDER, | |
| num_chunks=len(chunks), | |
| ) | |
| text_vecs = embed_fn(narrations) | |
| meta_vecs = embed_fn(meta_texts) | |
| if len(text_vecs) != len(chunks) or len(meta_vecs) != len(chunks): | |
| raise RuntimeError( | |
| f"Embedding count mismatch: text_vecs={len(text_vecs)}, meta_vecs={len(meta_vecs)}, chunks={len(chunks)}" | |
| ) | |
| # Convert to numpy arrays | |
| text_matrix = np.vstack(text_vecs).astype(np.float32) | |
| meta_matrix = np.vstack(meta_vecs).astype(np.float32) | |
| # Build HNSW indices --- | |
| dim = text_matrix.shape[1] | |
| text_index = hnswlib.Index(space="cosine", dim=dim) | |
| text_index.init_index( | |
| max_elements=len(chunks), | |
| ef_construction=GPPConfig.HNSW_EF_CONSTRUCTION, | |
| M=GPPConfig.HNSW_M, | |
| ) | |
| ids = [c["id"] for c in chunks] | |
| text_index.add_items(text_matrix, ids) | |
| text_index.set_ef(GPPConfig.HNSW_EF_SEARCH) | |
| logger.info("text_hnsw_built", elements=len(chunks)) | |
| # Meta index (same dim) | |
| meta_index = hnswlib.Index(space="cosine", dim=dim) | |
| meta_index.init_index( | |
| max_elements=len(chunks), | |
| ef_construction=GPPConfig.HNSW_EF_CONSTRUCTION, | |
| M=GPPConfig.HNSW_M, | |
| ) | |
| meta_index.add_items(meta_matrix, ids) | |
| meta_index.set_ef(GPPConfig.HNSW_EF_SEARCH) | |
| logger.info("meta_hnsw_built", elements=len(chunks)) | |
| # Persist indices to disk --- | |
| text_idx_path = os.path.join(output_dir, "hnsw_text_index.bin") | |
| meta_idx_path = os.path.join(output_dir, "hnsw_meta_index.bin") | |
| text_index.save_index(text_idx_path) | |
| meta_index.save_index(meta_idx_path) | |
| logger.info( | |
| "hnsw_indices_saved", text_index=text_idx_path, meta_index=meta_idx_path | |
| ) | |
| # Dump chunk metadata for UI traceability --- | |
| meta_path = os.path.join(output_dir, "chunk_metadata.json") | |
| metadata = { | |
| str(c["id"]): { | |
| "text": c.get("text", ""), | |
| "narration": c["narration"], | |
| "type": c.get("type", ""), | |
| "section_summary": c.get("section_summary", ""), | |
| } | |
| for c in chunks | |
| } | |
| with open(meta_path, "w", encoding="utf-8") as f: | |
| json.dump(metadata, f, ensure_ascii=False, indent=2) | |
| logger.info("chunk_metadata_saved", path=meta_path) | |
| def run(self, pdf_path: str, output_dir: str) -> Dict[str, Any]: | |
| """ | |
| Executes full GPP: parse -> chunk -> narrate -> enhance -> index. | |
| Returns parse output dict augmented with `chunks` for downstream processes. | |
| """ | |
| parsed = self.parse_pdf(pdf_path, output_dir) | |
| blocks = parsed.get("blocks", []) | |
| chunks = self.chunk_blocks(blocks) | |
| # assigning ID's to chuncks for traceability | |
| for idx, chunk in enumerate(chunks): | |
| chunk["id"] = idx | |
| self.narrate_multimodal(chunks) | |
| chunks = self.deduplicate(chunks) | |
| self.coref_resolution(chunks) | |
| self.metadata_summarization(chunks) | |
| self.build_bm25(chunks) | |
| self.compute_and_store(chunks, output_dir) | |
| parsed["chunks"] = chunks | |
| logger.info("GPP pipeline complete.") | |
| return parsed | |