hsg_rag_eea / src /pipeline /processors.py
Pygmales
updated project state
268baab
raw
history blame
10.5 kB
from collections import defaultdict
import os, re, hashlib, time, json
import importlib.util
from pathlib import Path
from transformers import AutoTokenizer
from docling_core.transforms.chunker.tokenizer.huggingface import HuggingFaceTokenizer
from docling.datamodel.pipeline_options import (
PdfPipelineOptions,
RapidOcrOptions,
LayoutOptions,
)
from docling_core.transforms.serializer.markdown import MarkdownDocSerializer
from docling.document_converter import DocumentConverter, PdfFormatOption, InputFormat
from docling.chunking import HybridChunker
from docling_core.types.doc.document import DoclingDocument
from src.pipeline.utilclasses import ProcessingResult
from src.utils.lang import detect_language
from src.utils.logging import get_logger
from config import BASE_URL, CHUNK_MAX_TOKENS, WeaviateConfiguration as wvtconf
weblogger = get_logger("website_processor")
datalogger = get_logger("data_processor")
class ProcessorBase:
def __init__(self, logging_callback) -> None:
pipeline_options = PdfPipelineOptions(
do_ocr=True,
ocr_options=RapidOcrOptions(
force_full_page_ocr=True,
),
generate_page_images=False,
images_scale=3.0,
do_layout_analysis=True,
do_table_structure=True,
do_cell_matching=True,
layout_options=LayoutOptions(
model_spec={
"name": "docling_layout_egret_medium",
"repo_id": "docling-project/docling-layout-egret-medium",
"revision": "main",
"model_path": "",
"supported_devices": ["cuda"]
},
create_orphan_clusters=True,
keep_empty_clusters=False,
skip_cell_assignment=False,
),
)
self._converter: DocumentConverter = DocumentConverter(
format_options={
InputFormat.PDF: PdfFormatOption(pipeline_options=pipeline_options),
},
)
tokenizer = AutoTokenizer.from_pretrained("sentence-transformers/all-MiniLM-L6-v2")
self._chunker = HybridChunker(
tokenizer=HuggingFaceTokenizer(
tokenizer=tokenizer,
max_tokens=CHUNK_MAX_TOKENS
),
max_tokens=CHUNK_MAX_TOKENS,
merge_peers=True
)
self._strategies = self._load_strategies()
self._logging_callback = logging_callback
def _load_strategies(self):
properties = {}
strategies = {}
os.makedirs(wvtconf.PROPERTIES_PATH, exist_ok=True)
os.makedirs(wvtconf.STRATEGIES_PATH, exist_ok=True)
properties_path = os.path.join(wvtconf.PROPERTIES_PATH, 'properties.json')
if not os.path.exists(properties_path):
raise ValueError(f"Properties file does not exist under {properties_path}! Ensure that the database interface was opened at least once!")
with open(properties_path) as f:
properties = json.load(f)
for prop in properties.keys():
strat_file = f'strat_{prop}.py'
strat_path = os.path.join(wvtconf.STRATEGIES_PATH, strat_file)
if not os.path.exists(strat_path):
raise ValueError(f"Could not find strategy for property {prop}!")
spec = importlib.util.spec_from_file_location(
name=prop,
location=strat_path
)
strategy = importlib.util.module_from_spec(spec)
spec.loader.exec_module(strategy)
if not hasattr(strategy, 'run'):
raise ValueError(f"Strategy '{strat_file}' has no 'run' function!")
strategies[prop] = strategy
return strategies
def process(self):
"""Abstract method to be implemented by subclasses."""
raise NotImplementedError("This method is not implemented in ProcessorBase")
def _prepare_chunks(self, document_name: str, document_content: str, chunks: list[str]) -> list[dict]:
prepared_chunks = []
for chunk in chunks:
prepared_chunk = {}
for prop, strat in self._strategies.items():
prepared_chunk[prop] = strat.run(document_name, document_content, chunk)
prepared_chunks.append(prepared_chunk)
return prepared_chunks
def _clean_content(self, document_content: str) -> str:
"""Removes the garbage symbols from text."""
cleaned = re.sub(r'\s+/\s+', '/', document_content)
cleaned = re.sub(r'\s+\.\s+', '.', cleaned)
cleaned = re.sub(r',\s+', '.', cleaned)
cleaned = re.sub(r'\s+\|\s+', ' ', cleaned)
cleaned = re.sub(r'\/\s+', '/', cleaned)
cleaned = re.sub(r'\s+/','/', cleaned)
cleaned = re.sub(r'\s+\.', '.', cleaned)
cleaned = re.sub(r'(\d+)\s*,\s*(\d{4})', r'\1', cleaned)
cleaned = re.sub(r'(\d+)\s*/\s*(\d+)', r'\1', cleaned)
cleaned = re.sub(r'\.(\d{4})', r'.\1', cleaned)
cleaned = cleaned.replace('ä', 'ä').replace('ö', 'ö').replace('ü', 'ü')
cleaned = re.sub(r'\n\s*\n+', '\n\n', cleaned)
cleaned = re.sub(r' +', ' ', cleaned)
return cleaned
def _extract_document_content(self, document: DoclingDocument) -> str:
"""Compiles text chunks found in the document into a single string."""
page_texts = defaultdict(list)
for text_item in document.texts:
if not text_item.text.strip():
continue
prov = text_item.prov[0] if text_item.prov else None
if prov:
page_number = prov.page_no
bbox = prov.bbox
page_texts[page_number].append({
'text': text_item.text.strip(),
'top': bbox.t,
'left': bbox.l,
'bottom': bbox.b,
})
full_page_texts = []
for page_number in sorted(page_texts.keys()):
text_items = sorted(
page_texts[page_number],
key=lambda text: (-text['top'], text['left']),
)
content = []
last_bottom = None
line_treshold = 15
for item in text_items:
text = item['text']
if last_bottom is not None and (last_bottom - item['bottom'] > line_treshold):
if content:
full_page_texts.append(' '.join(content))
content = []
if last_bottom - item['bottom'] > 50:
full_page_texts.append("")
content.append(text)
last_bottom = item['bottom']
if content:
full_page_texts.append(' '.join(content))
full_text = '\n\n'.join(full_page_texts)
cleaned_text = self._clean_content(full_text)
return cleaned_text
def _collect_chunks(self, document: DoclingDocument) -> list[str]:
chunks = []
for base_chunk in self._chunker.chunk(dl_doc=document):
enriched = self._chunker.contextualize(chunk=base_chunk)
chunks.append(enriched)
return chunks
def _collect_chunks_fallback(self, document_content: str) -> list[str]:
"""
Chunks the compiled text manually.
Args:
document_content (str): The full content extracted from document.
Returns:
list[str]: List of text chunks.
"""
tokenizer_wrapper = self._chunker.tokenizer
tokenizer = getattr(tokenizer_wrapper, 'tokenizer', tokenizer_wrapper)
tokens = tokenizer.encode(document_content)
chunk_size = self._chunker.max_tokens
overlap = 50
collected_chunks = []
for i in range(0, len(tokens), chunk_size-overlap):
chunk_tokens = tokens[i:i+chunk_size]
chunk = tokenizer.decode(
chunk_tokens,
skip_special_tokens=True,
clean_up_tokenization_spaces=True
)
collected_chunks.append(chunk)
return collected_chunks
class DocumentProcessor(ProcessorBase):
def process(self, source: Path | str) -> ProcessingResult:
"""
Process a single document source, converting it to text, chunking, and hashing.
Args:
source (Path | str): Path to the document to process.
Returns:
ProcessingResult: The result of the processing operation, including chunks and language.
"""
if not os.path.exists(source) or not os.path.isfile(source):
datalogger.error(f"Failed to initiate processing pipeline for source {source}: file does not exist")
return ProcessingResult(status=ProcessingStatus.NOT_FOUND)
document_name = os.path.basename(source)
datalogger.info(f"Initiating processing pipeline for source {document_name}")
self._logging_callback(f'Converting source {document_name}...', 20)
document = self._converter.convert(source).document
self._logging_callback(f'Collecting chunks from {document_name}...', 40)
collected_chunks = self._collect_chunks(document)
document_content = MarkdownDocSerializer(doc=document).serialize().text
if len(collected_chunks) <= 1: # Document content manual extraction
document_content = self._extract_document_content(document)
document = self._converter.convert_string(
content=document_content,
format=InputFormat.MD
).document
collected_chunks = self._collect_chunks(document)
self._logging_callback(f'Preparing chunks for {document_name} for importing...', 60)
prepared_chunks = self._prepare_chunks(document_name, document_content, collected_chunks)
datalogger.info(f"Successfully collected {len(prepared_chunks)} chunks from {document_name}")
return ProcessingResult(
chunks=prepared_chunks,
source=document_name,
lang=detect_language(document_content),
)
class WebsiteProcessor(ProcessorBase):
pass