Spaces:
Sleeping
Sleeping
| from collections import defaultdict | |
| import os, re, hashlib, time, json | |
| import importlib.util | |
| from pathlib import Path | |
| from transformers import AutoTokenizer | |
| from docling_core.transforms.chunker.tokenizer.huggingface import HuggingFaceTokenizer | |
| from docling.datamodel.pipeline_options import ( | |
| PdfPipelineOptions, | |
| RapidOcrOptions, | |
| LayoutOptions, | |
| ) | |
| from docling_core.transforms.serializer.markdown import MarkdownDocSerializer | |
| from docling.document_converter import DocumentConverter, PdfFormatOption, InputFormat | |
| from docling.chunking import HybridChunker | |
| from docling_core.types.doc.document import DoclingDocument | |
| from src.pipeline.utilclasses import ProcessingResult | |
| from src.utils.lang import detect_language | |
| from src.utils.logging import get_logger | |
| from config import BASE_URL, CHUNK_MAX_TOKENS, WeaviateConfiguration as wvtconf | |
| weblogger = get_logger("website_processor") | |
| datalogger = get_logger("data_processor") | |
| class ProcessorBase: | |
| def __init__(self, logging_callback) -> None: | |
| pipeline_options = PdfPipelineOptions( | |
| do_ocr=True, | |
| ocr_options=RapidOcrOptions( | |
| force_full_page_ocr=True, | |
| ), | |
| generate_page_images=False, | |
| images_scale=3.0, | |
| do_layout_analysis=True, | |
| do_table_structure=True, | |
| do_cell_matching=True, | |
| layout_options=LayoutOptions( | |
| model_spec={ | |
| "name": "docling_layout_egret_medium", | |
| "repo_id": "docling-project/docling-layout-egret-medium", | |
| "revision": "main", | |
| "model_path": "", | |
| "supported_devices": ["cuda"] | |
| }, | |
| create_orphan_clusters=True, | |
| keep_empty_clusters=False, | |
| skip_cell_assignment=False, | |
| ), | |
| ) | |
| self._converter: DocumentConverter = DocumentConverter( | |
| format_options={ | |
| InputFormat.PDF: PdfFormatOption(pipeline_options=pipeline_options), | |
| }, | |
| ) | |
| tokenizer = AutoTokenizer.from_pretrained("sentence-transformers/all-MiniLM-L6-v2") | |
| self._chunker = HybridChunker( | |
| tokenizer=HuggingFaceTokenizer( | |
| tokenizer=tokenizer, | |
| max_tokens=CHUNK_MAX_TOKENS | |
| ), | |
| max_tokens=CHUNK_MAX_TOKENS, | |
| merge_peers=True | |
| ) | |
| self._strategies = self._load_strategies() | |
| self._logging_callback = logging_callback | |
| def _load_strategies(self): | |
| properties = {} | |
| strategies = {} | |
| os.makedirs(wvtconf.PROPERTIES_PATH, exist_ok=True) | |
| os.makedirs(wvtconf.STRATEGIES_PATH, exist_ok=True) | |
| properties_path = os.path.join(wvtconf.PROPERTIES_PATH, 'properties.json') | |
| if not os.path.exists(properties_path): | |
| raise ValueError(f"Properties file does not exist under {properties_path}! Ensure that the database interface was opened at least once!") | |
| with open(properties_path) as f: | |
| properties = json.load(f) | |
| for prop in properties.keys(): | |
| strat_file = f'strat_{prop}.py' | |
| strat_path = os.path.join(wvtconf.STRATEGIES_PATH, strat_file) | |
| if not os.path.exists(strat_path): | |
| raise ValueError(f"Could not find strategy for property {prop}!") | |
| spec = importlib.util.spec_from_file_location( | |
| name=prop, | |
| location=strat_path | |
| ) | |
| strategy = importlib.util.module_from_spec(spec) | |
| spec.loader.exec_module(strategy) | |
| if not hasattr(strategy, 'run'): | |
| raise ValueError(f"Strategy '{strat_file}' has no 'run' function!") | |
| strategies[prop] = strategy | |
| return strategies | |
| def process(self): | |
| """Abstract method to be implemented by subclasses.""" | |
| raise NotImplementedError("This method is not implemented in ProcessorBase") | |
| def _prepare_chunks(self, document_name: str, document_content: str, chunks: list[str]) -> list[dict]: | |
| prepared_chunks = [] | |
| for chunk in chunks: | |
| prepared_chunk = {} | |
| for prop, strat in self._strategies.items(): | |
| prepared_chunk[prop] = strat.run(document_name, document_content, chunk) | |
| prepared_chunks.append(prepared_chunk) | |
| return prepared_chunks | |
| def _clean_content(self, document_content: str) -> str: | |
| """Removes the garbage symbols from text.""" | |
| cleaned = re.sub(r'\s+/\s+', '/', document_content) | |
| cleaned = re.sub(r'\s+\.\s+', '.', cleaned) | |
| cleaned = re.sub(r',\s+', '.', cleaned) | |
| cleaned = re.sub(r'\s+\|\s+', ' ', cleaned) | |
| cleaned = re.sub(r'\/\s+', '/', cleaned) | |
| cleaned = re.sub(r'\s+/','/', cleaned) | |
| cleaned = re.sub(r'\s+\.', '.', cleaned) | |
| cleaned = re.sub(r'(\d+)\s*,\s*(\d{4})', r'\1', cleaned) | |
| cleaned = re.sub(r'(\d+)\s*/\s*(\d+)', r'\1', cleaned) | |
| cleaned = re.sub(r'\.(\d{4})', r'.\1', cleaned) | |
| cleaned = cleaned.replace('ä', 'ä').replace('ö', 'ö').replace('ü', 'ü') | |
| cleaned = re.sub(r'\n\s*\n+', '\n\n', cleaned) | |
| cleaned = re.sub(r' +', ' ', cleaned) | |
| return cleaned | |
| def _extract_document_content(self, document: DoclingDocument) -> str: | |
| """Compiles text chunks found in the document into a single string.""" | |
| page_texts = defaultdict(list) | |
| for text_item in document.texts: | |
| if not text_item.text.strip(): | |
| continue | |
| prov = text_item.prov[0] if text_item.prov else None | |
| if prov: | |
| page_number = prov.page_no | |
| bbox = prov.bbox | |
| page_texts[page_number].append({ | |
| 'text': text_item.text.strip(), | |
| 'top': bbox.t, | |
| 'left': bbox.l, | |
| 'bottom': bbox.b, | |
| }) | |
| full_page_texts = [] | |
| for page_number in sorted(page_texts.keys()): | |
| text_items = sorted( | |
| page_texts[page_number], | |
| key=lambda text: (-text['top'], text['left']), | |
| ) | |
| content = [] | |
| last_bottom = None | |
| line_treshold = 15 | |
| for item in text_items: | |
| text = item['text'] | |
| if last_bottom is not None and (last_bottom - item['bottom'] > line_treshold): | |
| if content: | |
| full_page_texts.append(' '.join(content)) | |
| content = [] | |
| if last_bottom - item['bottom'] > 50: | |
| full_page_texts.append("") | |
| content.append(text) | |
| last_bottom = item['bottom'] | |
| if content: | |
| full_page_texts.append(' '.join(content)) | |
| full_text = '\n\n'.join(full_page_texts) | |
| cleaned_text = self._clean_content(full_text) | |
| return cleaned_text | |
| def _collect_chunks(self, document: DoclingDocument) -> list[str]: | |
| chunks = [] | |
| for base_chunk in self._chunker.chunk(dl_doc=document): | |
| enriched = self._chunker.contextualize(chunk=base_chunk) | |
| chunks.append(enriched) | |
| return chunks | |
| def _collect_chunks_fallback(self, document_content: str) -> list[str]: | |
| """ | |
| Chunks the compiled text manually. | |
| Args: | |
| document_content (str): The full content extracted from document. | |
| Returns: | |
| list[str]: List of text chunks. | |
| """ | |
| tokenizer_wrapper = self._chunker.tokenizer | |
| tokenizer = getattr(tokenizer_wrapper, 'tokenizer', tokenizer_wrapper) | |
| tokens = tokenizer.encode(document_content) | |
| chunk_size = self._chunker.max_tokens | |
| overlap = 50 | |
| collected_chunks = [] | |
| for i in range(0, len(tokens), chunk_size-overlap): | |
| chunk_tokens = tokens[i:i+chunk_size] | |
| chunk = tokenizer.decode( | |
| chunk_tokens, | |
| skip_special_tokens=True, | |
| clean_up_tokenization_spaces=True | |
| ) | |
| collected_chunks.append(chunk) | |
| return collected_chunks | |
| class DocumentProcessor(ProcessorBase): | |
| def process(self, source: Path | str) -> ProcessingResult: | |
| """ | |
| Process a single document source, converting it to text, chunking, and hashing. | |
| Args: | |
| source (Path | str): Path to the document to process. | |
| Returns: | |
| ProcessingResult: The result of the processing operation, including chunks and language. | |
| """ | |
| if not os.path.exists(source) or not os.path.isfile(source): | |
| datalogger.error(f"Failed to initiate processing pipeline for source {source}: file does not exist") | |
| return ProcessingResult(status=ProcessingStatus.NOT_FOUND) | |
| document_name = os.path.basename(source) | |
| datalogger.info(f"Initiating processing pipeline for source {document_name}") | |
| self._logging_callback(f'Converting source {document_name}...', 20) | |
| document = self._converter.convert(source).document | |
| self._logging_callback(f'Collecting chunks from {document_name}...', 40) | |
| collected_chunks = self._collect_chunks(document) | |
| document_content = MarkdownDocSerializer(doc=document).serialize().text | |
| if len(collected_chunks) <= 1: # Document content manual extraction | |
| document_content = self._extract_document_content(document) | |
| document = self._converter.convert_string( | |
| content=document_content, | |
| format=InputFormat.MD | |
| ).document | |
| collected_chunks = self._collect_chunks(document) | |
| self._logging_callback(f'Preparing chunks for {document_name} for importing...', 60) | |
| prepared_chunks = self._prepare_chunks(document_name, document_content, collected_chunks) | |
| datalogger.info(f"Successfully collected {len(prepared_chunks)} chunks from {document_name}") | |
| return ProcessingResult( | |
| chunks=prepared_chunks, | |
| source=document_name, | |
| lang=detect_language(document_content), | |
| ) | |
| class WebsiteProcessor(ProcessorBase): | |
| pass | |