Spaces:
Running
Running
File size: 7,616 Bytes
e97c8d1 |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 |
import json
import os
import base64
from io import BytesIO
from typing import List, Dict, Any
from docling.document_converter import DocumentConverter, PdfFormatOption
from docling.datamodel.base_models import InputFormat
from docling.datamodel.pipeline_options import PdfPipelineOptions, PictureDescriptionApiOptions
from docling_core.types.doc.labels import DocItemLabel
from docling_core.types.doc.document import SectionHeaderItem, TitleItem
from config import GROQ_API_KEY
from docling.chunking import HybridChunker
from docling.datamodel.accelerator_options import AcceleratorDevice, AcceleratorOptions
from docling.datamodel.settings import settings
from docling.datamodel.pipeline_options import (
PdfPipelineOptions,
OcrAutoOptions
)
class EnrichedRagParser:
"""
Parser using Docling's HybridChunker for Multimodal RAG.
Modified from sonnet_export.py for modular use.
"""
def __init__(self, groq_api_key: str = GROQ_API_KEY):
self.groq_api_key = groq_api_key
self.converter = self._setup_converter()
self.chunker = HybridChunker(merge_peers=True)
def _setup_converter(self) -> DocumentConverter:
# CPU Configuration
accelerator_options = AcceleratorOptions(
num_threads=min(12, os.cpu_count()),
device=AcceleratorDevice.CPU
)
# Smart OCR Configuration
# Only triggers when >50% of page is scanned/bitmap content
ocr_options = OcrAutoOptions(
lang=["en"], # ✅ Specify language
force_full_page_ocr=False, # ⚡ Don't force OCR on all pages
bitmap_area_threshold=0.5 # ⚡ Smart: Only OCR if >50% scanned
)
# Pipeline Configuration
pipeline_options = PdfPipelineOptions(
# Features
do_ocr=True, # Enable OCR (but smart triggering)
do_table_structure=True,
generate_picture_images=True,
images_scale=1,
ocr_options=ocr_options, # ⚡ Smart OCR config
# Disable unnecessary features
generate_page_images=False,
enable_remote_services=True,
# Picture descriptions - using VLM (local)
do_picture_description=True,
# Resource management
queue_max_size=10,
document_timeout=300.0
)
pipeline_options.accelerator_options = accelerator_options
settings.debug.profile_pipeline_timings = True
pipeline_options.picture_description_options = PictureDescriptionApiOptions(
url="https://api.groq.com/openai/v1/chat/completions",
params={
"model": "meta-llama/llama-4-scout-17b-16e-instruct", # Double check this model string
"temperature": 0.2,
"max_tokens": 500,
},
prompt="Describe this image in detail for a RAG knowledge base. Include all visible text, numbers, and chart trends.",
headers={"Authorization": f"Bearer {self.groq_api_key}"}
)
return DocumentConverter(
format_options={
InputFormat.PDF: PdfFormatOption(pipeline_options=pipeline_options)
}
)
def _determine_chunk_type(self, chunk) -> str:
chunk_type = "text"
if hasattr(chunk.meta, "doc_items") and chunk.meta.doc_items:
labels = [item.label for item in chunk.meta.doc_items]
if DocItemLabel.TABLE in labels:
chunk_type = "table"
elif DocItemLabel.LIST_ITEM in labels:
chunk_type = "list"
elif any(l in [DocItemLabel.TITLE, DocItemLabel.SECTION_HEADER] for l in labels):
chunk_type = "header"
elif DocItemLabel.CODE in labels:
chunk_type = "code"
return chunk_type
def _get_base64_image(self, pic) -> str:
try:
if hasattr(pic, "image") and pic.image and hasattr(pic.image, "pil_image"):
img = pic.image.pil_image
if img:
buffered = BytesIO()
if img.mode != "RGB":
img = img.convert("RGB")
img.save(buffered, format="PNG")
return base64.b64encode(buffered.getvalue()).decode("utf-8")
except Exception as e:
print(f"Failed to convert image to base64: {e}")
return ""
def _find_image_heading(self, doc, pic_item) -> str:
current_heading = "Unknown"
for item, level in doc.iterate_items():
if isinstance(item, (SectionHeaderItem, TitleItem)):
if hasattr(item, 'text'):
current_heading = item.text
if item == pic_item:
return current_heading
return current_heading
def process_document(self, file_path: str, save_json: bool = True, output_dir: str = "rag_data", max_page: int = 10) -> Dict[str, Any]:
"""Converts document and returns structured data."""
print(f"Testing Docling Parser on: {file_path}...")
result = self.converter.convert(file_path)
doc = result.document
doc_conversion_secs = result.timings["pipeline_total"].times
print(f"Doc conversion time: {doc_conversion_secs} seconds")
chunk_iter = self.chunker.chunk(dl_doc=doc)
structured_chunks = []
for i, chunk in enumerate(chunk_iter):
heading = chunk.meta.headings[0] if chunk.meta.headings else "Unknown"
page_num = 0
if hasattr(chunk.meta, "doc_items") and chunk.meta.doc_items:
for item in chunk.meta.doc_items:
if hasattr(item, "prov") and item.prov:
if len(item.prov) > 0 and hasattr(item.prov[0], "page_no"):
page_num = item.prov[0].page_no
break
structured_chunks.append({
"chunk_id": f"chunk_{i}",
"type": self._determine_chunk_type(chunk),
"text": chunk.text,
"metadata": {
"source": os.path.basename(file_path),
"page_number": page_num,
"section_header": heading
}
})
images_data = []
for i, pic in enumerate(doc.pictures):
description = "No description"
if hasattr(pic, "meta") and pic.meta and hasattr(pic.meta, "description"):
desc_obj = pic.meta.description
description = desc_obj.text if hasattr(desc_obj, "text") else str(desc_obj)
images_data.append({
"image_id": f"img_{i}",
"description": description,
"page_number": pic.prov[0].page_no if pic.prov else 0,
"section_header": self._find_image_heading(doc, pic),
"image_base64": self._get_base64_image(pic)
})
final_output = {"chunks": structured_chunks, "images": images_data}
if save_json:
os.makedirs(output_dir, exist_ok=True)
with open(os.path.join(output_dir, "parsed_knowledge.json"), "w", encoding="utf-8") as f:
json.dump(final_output, f, indent=2, ensure_ascii=False)
print(f"Saved parsed knowledge to {output_dir}/parsed_knowledge.json")
return final_output
|