Spaces:
Sleeping
Sleeping
| import os, re, hashlib, time | |
| from enum import Enum | |
| from datetime import datetime, timezone | |
| from pathlib import Path | |
| from dataclasses import dataclass | |
| from transformers import AutoTokenizer | |
| from docling_core.transforms.chunker.tokenizer.huggingface import HuggingFaceTokenizer | |
| from docling.document_converter import DocumentConverter | |
| from docling.chunking import HybridChunker | |
| from docling_core.types.doc.document import DoclingDocument | |
| from src.utils.lang import detect_language | |
| from src.utils.logging import get_logger | |
| from config import BASE_URL, CHUNK_MAX_TOKENS | |
| weblogger = get_logger("website_processor") | |
| datalogger = get_logger("data_processor") | |
| _TRANSFORMERS_TOKENIZER = AutoTokenizer.from_pretrained("sentence-transformers/all-MiniLM-L6-v2") | |
| _EN_URL_PATTERN = r'\[EN\]\((https://emba\.unisg\.ch/en/[^\s)]+)\)' | |
| _PROGRAM_URL_PATTERN = r'https://emba\.unisg\.ch/(?:programm[^\s)]+|en/embax)' | |
| def _get_hash(text: str) -> str: | |
| """Generate an MD5 hash for the given text.""" | |
| return hashlib.md5(text.strip().encode("utf-8")).hexdigest() | |
| def _get_en_version(text: str): | |
| """Extract the English version URL from the given text, if available.""" | |
| result = re.search(_EN_URL_PATTERN, text) | |
| if result: | |
| return result.group(1) | |
| return "" | |
| def _get_program_urls(text: str): | |
| """Find all program URLs in the given text.""" | |
| return re.findall(_PROGRAM_URL_PATTERN, text) | |
| def _detect_programs(text: str): | |
| """ | |
| Identify MBA program names mentioned in the given text. | |
| Args: | |
| text (str): The text to search for program mentions. | |
| Returns: | |
| list[str]: List of detected program identifiers. | |
| """ | |
| programms = [] | |
| lc_text = text.lower() | |
| found = lambda txt: txt in lc_text | |
| if found('emba') or found('executive mba'): | |
| programms.append('emba') | |
| if found('iemba') or found('international emba') or found('international executive mba'): | |
| programms.append('iemba') | |
| if found('emba x'): | |
| programms.append('emba_x') | |
| return programms | |
| class ProcessingStatus(Enum): | |
| NOT_FOUND = 1 | |
| SUCCESS = 2 | |
| FAILURE = 3 | |
| INCORRECT_FORMAT = 5 | |
| class _ChunkMetadata: | |
| programs: str | |
| date: str | |
| document_id: str | |
| language: str | |
| source: str = None | |
| class ProcessingResult: | |
| status: ProcessingStatus = ProcessingStatus.SUCCESS | |
| chunks: list = None | |
| document_id: str = None | |
| language: str = None | |
| class _ProcessorBase: | |
| def __init__(self): | |
| """ | |
| Initialize the Processor with converter, chunker, and hashtable. | |
| Args: | |
| config (dict, optional): Configuration dictionary for processing options. | |
| """ | |
| self._converter = DocumentConverter() | |
| self._chunker = HybridChunker( | |
| tokenizer=HuggingFaceTokenizer( | |
| tokenizer=_TRANSFORMERS_TOKENIZER, | |
| max_tokens=CHUNK_MAX_TOKENS | |
| ), | |
| max_tokens=CHUNK_MAX_TOKENS, | |
| merge_peers=True | |
| ) | |
| def process(self): | |
| """Abstract method to be implemented by subclasses.""" | |
| raise NotImplementedError("This method is not implemented in ProcessorBase") | |
| def _collect_metadata(self, text: str) -> _ChunkMetadata: | |
| """Collect metadata such as programs, date, language, and document hash.""" | |
| return _ChunkMetadata( | |
| programs=_detect_programs(text), | |
| date=datetime.now().replace(tzinfo=timezone.utc), | |
| language=detect_language(text), | |
| document_id=_get_hash(text)) | |
| def _collect_chunks(self, document: DoclingDocument, metadata: _ChunkMetadata) -> list: | |
| """ | |
| Collect text chunks from a document and prepare them with metadata. | |
| Args: | |
| document (DoclingDocument): The converted document object. | |
| metadata (_ChunkMetadata): Metadata containing program, source, and date information. | |
| Returns: | |
| list[dict]: List of chunk dictionaries containing text and metadata. | |
| """ | |
| chunks = [self._chunker.contextualize(chunk=c) for c in self._chunker.chunk(document)] | |
| prepared_chunks = [] | |
| for chunk in chunks: | |
| c_hash = _get_hash(chunk) | |
| prepared_chunks.append({ | |
| 'body': chunk, | |
| 'chunk_id': c_hash, | |
| 'document_id': metadata.document_id, | |
| 'programs': metadata.programs, | |
| 'date': metadata.date, | |
| 'source': metadata.source | |
| }) | |
| return prepared_chunks | |
| class WebsiteProcessor(_ProcessorBase): | |
| def __init__(self): | |
| """Initialize the WebsiteProcessor with base processing capabilities.""" | |
| super().__init__() | |
| def process(self) -> list[ProcessingResult]: | |
| """ | |
| Scrape and process program pages from the HSG website. | |
| Returns: | |
| list[ProcessingResult]: A list of processing results for each processed URL. | |
| """ | |
| weblogger.info("Initiating scraping and processing of the HSG program pages.") | |
| urls = [BASE_URL] | |
| results = [] | |
| while urls: | |
| url = urls.pop() | |
| result, text = self._process_url(url) | |
| if result.status != ProcessingStatus.SUCCESS: | |
| weblogger.warning(f"Failed to process URLs {url}.") | |
| continue | |
| if url == BASE_URL: | |
| program_urls = _get_program_urls(text) | |
| urls.extend(program_urls) | |
| weblogger.info(f"Found following program URLs: {', '.join(program_urls)}.") | |
| if '/en/' not in url: | |
| en_url = _get_en_version(text) | |
| urls.append(en_url) | |
| weblogger.info(f"Added an english version of the URL {en_url} to the processing list") | |
| results.append(result) | |
| time.sleep(2) | |
| weblogger.info(f"Successfully processed {len(results)} URLs.") | |
| return results | |
| def _process_url(self, url: str) -> tuple[ProcessingResult, str]: | |
| """ | |
| Process the content of a single URL, converting it into chunks with metadata. | |
| Args: | |
| url (str): The URL of the webpage to process. | |
| Returns: | |
| tuple[ProcessingResult, str]: The processing result and the extracted text. | |
| """ | |
| weblogger.info(f"Initiating processing pipeline for url {url}") | |
| try: | |
| document = self._converter.convert(url).document | |
| except Exception as e: | |
| weblogger.error(f"Failed to load the contents of the url page {url}: {e}") | |
| return ProcessingResult(status=ProcessingStatus.FAILURE) | |
| text = document.export_to_markdown() | |
| metadata = self._collect_metadata(text) | |
| metadata.source = url | |
| collected_chunks = self._collect_chunks(document, metadata) | |
| del collected_chunks[0] | |
| weblogger.info(f"Successfully collected {len(collected_chunks)} chunks from {url}") | |
| return ProcessingResult(chunks=collected_chunks, language=metadata.language, document_id=metadata.document_id), text | |
| class DataProcessor(_ProcessorBase): | |
| """ | |
| Handles document processing, including conversion, chunking, language detection, | |
| and hash-based deduplication. | |
| """ | |
| def process_many_documents(self, sources: list[Path | str]) -> list[ProcessingResult]: | |
| """ | |
| Process a list of document sources sequentially. | |
| Args: | |
| sources (list[Path | str]): List of file paths or URLs to process. | |
| Returns: | |
| list[ProcessingResult]: List of results for each processed document. | |
| """ | |
| return [self.process(source) for source in sources] | |
| def process(self, source: Path | str) -> ProcessingResult: | |
| """ | |
| Process a single document source, converting it to text, chunking, and hashing. | |
| Args: | |
| source (Path | str): Path to the document to process. | |
| Returns: | |
| ProcessingResult: The result of the processing operation, including chunks and language. | |
| """ | |
| if not os.path.exists(source) or not os.path.isfile(source): | |
| datalogger.error(f"Failed to initiate processing pipeline for source {source}: file does not exist") | |
| return ProcessingResult(status=ProcessingStatus.NOT_FOUND) | |
| datalogger.info(f"Initiating processing pipeline for source {source}") | |
| document = self._converter.convert(source).document | |
| metadata = self._collect_metadata(document.export_to_markdown()) | |
| metadata.source = os.path.basename(source) | |
| collected_chunks = self._collect_chunks(document, metadata) | |
| datalogger.info(f"Successfully collected {len(collected_chunks)} chunks from {source}") | |
| return ProcessingResult(chunks=collected_chunks, language=metadata.language, document_id=metadata.document_id) | |
| if __name__ == "__main__": | |
| processor = WebsiteProcessor() | |
| results = processor.process() | |
| for result in results: | |
| for chunk in result.chunks: | |
| print(chunk['body'], end='\n\n') | |