File size: 7,616 Bytes
e97c8d1
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
import json
import os
import base64
from io import BytesIO
from typing import List, Dict, Any
from docling.document_converter import DocumentConverter, PdfFormatOption
from docling.datamodel.base_models import InputFormat
from docling.datamodel.pipeline_options import PdfPipelineOptions, PictureDescriptionApiOptions
from docling_core.types.doc.labels import DocItemLabel
from docling_core.types.doc.document import SectionHeaderItem, TitleItem
from config import GROQ_API_KEY
from docling.chunking import HybridChunker
from docling.datamodel.accelerator_options import AcceleratorDevice, AcceleratorOptions
from docling.datamodel.settings import settings
from docling.datamodel.pipeline_options import (
    PdfPipelineOptions,
    OcrAutoOptions 
)

class EnrichedRagParser:
    """
    Parser using Docling's HybridChunker for Multimodal RAG.
    Modified from sonnet_export.py for modular use.
    """
    
    def __init__(self, groq_api_key: str = GROQ_API_KEY):
        self.groq_api_key = groq_api_key
        self.converter = self._setup_converter()
        self.chunker = HybridChunker(merge_peers=True)

    def _setup_converter(self) -> DocumentConverter:

        # CPU Configuration
        accelerator_options = AcceleratorOptions(
            num_threads=min(12, os.cpu_count()),
            device=AcceleratorDevice.CPU
        )
        
        # Smart OCR Configuration
        # Only triggers when >50% of page is scanned/bitmap content
        ocr_options = OcrAutoOptions(
            lang=["en"],                        # ✅ Specify language
            force_full_page_ocr=False,          # ⚡ Don't force OCR on all pages
            bitmap_area_threshold=0.5           # ⚡ Smart: Only OCR if >50% scanned
        )
        
        # Pipeline Configuration
        pipeline_options = PdfPipelineOptions(
            # Features
            do_ocr=True,                        # Enable OCR (but smart triggering)
            do_table_structure=True,
            generate_picture_images=True,
            images_scale=1,
            ocr_options=ocr_options,            # ⚡ Smart OCR config
            
            # Disable unnecessary features
            generate_page_images=False,
            enable_remote_services=True,
            
            # Picture descriptions - using VLM (local)
            do_picture_description=True,
            
            # Resource management
            queue_max_size=10,
            document_timeout=300.0
        )
        
        pipeline_options.accelerator_options = accelerator_options
        settings.debug.profile_pipeline_timings = True

        pipeline_options.picture_description_options = PictureDescriptionApiOptions(
            url="https://api.groq.com/openai/v1/chat/completions",
            params={
                "model": "meta-llama/llama-4-scout-17b-16e-instruct", # Double check this model string
                "temperature": 0.2,
                "max_tokens": 500,
            },
            prompt="Describe this image in detail for a RAG knowledge base. Include all visible text, numbers, and chart trends.",
            headers={"Authorization": f"Bearer {self.groq_api_key}"}
        )
        
        return DocumentConverter(
            format_options={
                InputFormat.PDF: PdfFormatOption(pipeline_options=pipeline_options)
            }
        )



    def _determine_chunk_type(self, chunk) -> str:
        chunk_type = "text"
        if hasattr(chunk.meta, "doc_items") and chunk.meta.doc_items:
            labels = [item.label for item in chunk.meta.doc_items]
            if DocItemLabel.TABLE in labels:
                chunk_type = "table"
            elif DocItemLabel.LIST_ITEM in labels:
                chunk_type = "list"
            elif any(l in [DocItemLabel.TITLE, DocItemLabel.SECTION_HEADER] for l in labels):
                chunk_type = "header"
            elif DocItemLabel.CODE in labels:
                chunk_type = "code"
        return chunk_type

    def _get_base64_image(self, pic) -> str:
        try:
            if hasattr(pic, "image") and pic.image and hasattr(pic.image, "pil_image"):
                img = pic.image.pil_image
                if img:
                    buffered = BytesIO()
                    if img.mode != "RGB":
                        img = img.convert("RGB")
                    img.save(buffered, format="PNG")
                    return base64.b64encode(buffered.getvalue()).decode("utf-8")
        except Exception as e:
            print(f"Failed to convert image to base64: {e}")
        return ""

    def _find_image_heading(self, doc, pic_item) -> str:
        current_heading = "Unknown"
        for item, level in doc.iterate_items():
            if isinstance(item, (SectionHeaderItem, TitleItem)):
                if hasattr(item, 'text'):
                    current_heading = item.text
            if item == pic_item:
                return current_heading
        return current_heading

    def process_document(self, file_path: str, save_json: bool = True, output_dir: str = "rag_data", max_page: int = 10) -> Dict[str, Any]:
        """Converts document and returns structured data."""
        print(f"Testing Docling Parser on: {file_path}...")
        
        result = self.converter.convert(file_path)
        doc = result.document
        doc_conversion_secs = result.timings["pipeline_total"].times
        print(f"Doc conversion time: {doc_conversion_secs} seconds")

        chunk_iter = self.chunker.chunk(dl_doc=doc)

        structured_chunks = []
        for i, chunk in enumerate(chunk_iter):
            heading = chunk.meta.headings[0] if chunk.meta.headings else "Unknown"

            page_num = 0
            if hasattr(chunk.meta, "doc_items") and chunk.meta.doc_items:
                for item in chunk.meta.doc_items:
                    if hasattr(item, "prov") and item.prov:
                        if len(item.prov) > 0 and hasattr(item.prov[0], "page_no"):
                            page_num = item.prov[0].page_no
                            break

            structured_chunks.append({
                "chunk_id": f"chunk_{i}",
                "type": self._determine_chunk_type(chunk),
                "text": chunk.text,
                "metadata": {
                    "source": os.path.basename(file_path),
                    "page_number": page_num,
                    "section_header": heading
                }
            })

        images_data = []
        for i, pic in enumerate(doc.pictures):

            description = "No description"
            if hasattr(pic, "meta") and pic.meta and hasattr(pic.meta, "description"):
                desc_obj = pic.meta.description
                description = desc_obj.text if hasattr(desc_obj, "text") else str(desc_obj)
            
            images_data.append({
                "image_id": f"img_{i}",
                "description": description,
                "page_number": pic.prov[0].page_no if pic.prov else 0,
                "section_header": self._find_image_heading(doc, pic),
                "image_base64": self._get_base64_image(pic)
            })

        final_output = {"chunks": structured_chunks, "images": images_data}

        if save_json:
            os.makedirs(output_dir, exist_ok=True)
            with open(os.path.join(output_dir, "parsed_knowledge.json"), "w", encoding="utf-8") as f:
                json.dump(final_output, f, indent=2, ensure_ascii=False)
            print(f"Saved parsed knowledge to {output_dir}/parsed_knowledge.json")
        
        return final_output