import json import os import base64 from io import BytesIO from typing import List, Dict, Any from docling.document_converter import DocumentConverter, PdfFormatOption from docling.datamodel.base_models import InputFormat from docling.datamodel.pipeline_options import PdfPipelineOptions, PictureDescriptionApiOptions from docling_core.types.doc.labels import DocItemLabel from docling_core.types.doc.document import SectionHeaderItem, TitleItem from config import GROQ_API_KEY from docling.chunking import HybridChunker from docling.datamodel.accelerator_options import AcceleratorDevice, AcceleratorOptions from docling.datamodel.settings import settings from docling.datamodel.pipeline_options import ( PdfPipelineOptions, OcrAutoOptions ) class EnrichedRagParser: """ Parser using Docling's HybridChunker for Multimodal RAG. Modified from sonnet_export.py for modular use. """ def __init__(self, groq_api_key: str = GROQ_API_KEY): self.groq_api_key = groq_api_key self.converter = self._setup_converter() self.chunker = HybridChunker(merge_peers=True) def _setup_converter(self) -> DocumentConverter: # CPU Configuration accelerator_options = AcceleratorOptions( num_threads=min(12, os.cpu_count()), device=AcceleratorDevice.CPU ) # Smart OCR Configuration # Only triggers when >50% of page is scanned/bitmap content ocr_options = OcrAutoOptions( lang=["en"], # ✅ Specify language force_full_page_ocr=False, # ⚡ Don't force OCR on all pages bitmap_area_threshold=0.5 # ⚡ Smart: Only OCR if >50% scanned ) # Pipeline Configuration pipeline_options = PdfPipelineOptions( # Features do_ocr=True, # Enable OCR (but smart triggering) do_table_structure=True, generate_picture_images=True, images_scale=1, ocr_options=ocr_options, # ⚡ Smart OCR config # Disable unnecessary features generate_page_images=False, enable_remote_services=True, # Picture descriptions - using VLM (local) do_picture_description=True, # Resource management queue_max_size=10, document_timeout=300.0 ) pipeline_options.accelerator_options = accelerator_options settings.debug.profile_pipeline_timings = True pipeline_options.picture_description_options = PictureDescriptionApiOptions( url="https://api.groq.com/openai/v1/chat/completions", params={ "model": "meta-llama/llama-4-scout-17b-16e-instruct", # Double check this model string "temperature": 0.2, "max_tokens": 500, }, prompt="Describe this image in detail for a RAG knowledge base. Include all visible text, numbers, and chart trends.", headers={"Authorization": f"Bearer {self.groq_api_key}"} ) return DocumentConverter( format_options={ InputFormat.PDF: PdfFormatOption(pipeline_options=pipeline_options) } ) def _determine_chunk_type(self, chunk) -> str: chunk_type = "text" if hasattr(chunk.meta, "doc_items") and chunk.meta.doc_items: labels = [item.label for item in chunk.meta.doc_items] if DocItemLabel.TABLE in labels: chunk_type = "table" elif DocItemLabel.LIST_ITEM in labels: chunk_type = "list" elif any(l in [DocItemLabel.TITLE, DocItemLabel.SECTION_HEADER] for l in labels): chunk_type = "header" elif DocItemLabel.CODE in labels: chunk_type = "code" return chunk_type def _get_base64_image(self, pic) -> str: try: if hasattr(pic, "image") and pic.image and hasattr(pic.image, "pil_image"): img = pic.image.pil_image if img: buffered = BytesIO() if img.mode != "RGB": img = img.convert("RGB") img.save(buffered, format="PNG") return base64.b64encode(buffered.getvalue()).decode("utf-8") except Exception as e: print(f"Failed to convert image to base64: {e}") return "" def _find_image_heading(self, doc, pic_item) -> str: current_heading = "Unknown" for item, level in doc.iterate_items(): if isinstance(item, (SectionHeaderItem, TitleItem)): if hasattr(item, 'text'): current_heading = item.text if item == pic_item: return current_heading return current_heading def process_document(self, file_path: str, save_json: bool = True, output_dir: str = "rag_data", max_page: int = 10) -> Dict[str, Any]: """Converts document and returns structured data.""" print(f"Testing Docling Parser on: {file_path}...") result = self.converter.convert(file_path) doc = result.document doc_conversion_secs = result.timings["pipeline_total"].times print(f"Doc conversion time: {doc_conversion_secs} seconds") chunk_iter = self.chunker.chunk(dl_doc=doc) structured_chunks = [] for i, chunk in enumerate(chunk_iter): heading = chunk.meta.headings[0] if chunk.meta.headings else "Unknown" page_num = 0 if hasattr(chunk.meta, "doc_items") and chunk.meta.doc_items: for item in chunk.meta.doc_items: if hasattr(item, "prov") and item.prov: if len(item.prov) > 0 and hasattr(item.prov[0], "page_no"): page_num = item.prov[0].page_no break structured_chunks.append({ "chunk_id": f"chunk_{i}", "type": self._determine_chunk_type(chunk), "text": chunk.text, "metadata": { "source": os.path.basename(file_path), "page_number": page_num, "section_header": heading } }) images_data = [] for i, pic in enumerate(doc.pictures): description = "No description" if hasattr(pic, "meta") and pic.meta and hasattr(pic.meta, "description"): desc_obj = pic.meta.description description = desc_obj.text if hasattr(desc_obj, "text") else str(desc_obj) images_data.append({ "image_id": f"img_{i}", "description": description, "page_number": pic.prov[0].page_no if pic.prov else 0, "section_header": self._find_image_heading(doc, pic), "image_base64": self._get_base64_image(pic) }) final_output = {"chunks": structured_chunks, "images": images_data} if save_json: os.makedirs(output_dir, exist_ok=True) with open(os.path.join(output_dir, "parsed_knowledge.json"), "w", encoding="utf-8") as f: json.dump(final_output, f, indent=2, ensure_ascii=False) print(f"Saved parsed knowledge to {output_dir}/parsed_knowledge.json") return final_output