Spaces:
Running
Running
| from docling.document_converter import InputFormat | |
| from docling_core.types.doc.document import DoclingDocument, TitleItem | |
| from .types import ChunkMetadata | |
| from ..config import config | |
| from ..pipeline.processors import ProcessorBase | |
| from ..utils.logging import get_logger | |
| logger = get_logger('scraper.processor') | |
| class HTMLProcessor(ProcessorBase): | |
| def __init__(self) -> None: | |
| super().__init__() | |
| def process(self, url: str, html_content: str) -> DoclingDocument | None: | |
| if not html_content: | |
| logger.warning('Nothing to process, HTML body is empty!') | |
| return None | |
| logger.info(f"Analyzing page layout of URL '{url}'...") | |
| try: | |
| document = self._converter.convert_string(html_content, InputFormat.HTML).document | |
| document.name = url | |
| return document | |
| except Exception as e: | |
| logger.error(f"Failed to analyze page layout: {e}") | |
| return None | |
| def prepare_chunks(self, url: str, url_text: str, metas: list[ChunkMetadata]) -> dict[str, list]: | |
| prepared_chunks = { lang: [] for lang in config.get('AVAILABLE_LANGUAGES', ['en', 'de']) } | |
| for meta in metas: | |
| prepared_chunks[meta.language].append(meta.text) | |
| for lang, chunks in prepared_chunks.items(): | |
| prepared_chunks[lang] = self._prepare_chunks(url, url_text, chunks) | |
| return prepared_chunks | |
| def extract_title(self, document: DoclingDocument) -> str: | |
| titles = [title.text for title in document.texts if isinstance(title, TitleItem)] | |
| return titles[0] if titles else 'No Title' | |
| def chunk(self, document: DoclingDocument) -> list[dict]: | |
| raw_chunks = list(self._chunker.chunk(document)) | |
| chunks = self._merge_chunks_by_headings(raw_chunks) | |
| prepared_chunks = [{ | |
| 'text': chunk, | |
| 'title': chunk.split('\n')[0], | |
| 'size': self._chunker.tokenizer.count_tokens(chunk) | |
| } for chunk in chunks] | |
| return prepared_chunks | |
| def merge_chunks_by_topic(self, chunk_metadatas: list[ChunkMetadata]) -> list[ChunkMetadata]: | |
| MAX_TOKENS = config.processing.MAX_TOKENS | |
| merged_chunks = [] | |
| current_group = [] | |
| current_tokens = 0 | |
| current_topic = None | |
| for chunk in chunk_metadatas: | |
| topic = chunk.topic | |
| token_size = chunk.token_size | |
| # If the chunk is already large enough, it will not be merged | |
| if token_size >= MAX_TOKENS: | |
| # Consequtive group is over when large chunk is met | |
| if current_group: | |
| merged_chunks.append(self._create_merged_chunk(current_group)) | |
| current_group = [] | |
| current_tokens = 0 | |
| current_topic = None | |
| # Large chunk is appended here | |
| merged_chunks.append(chunk) | |
| continue | |
| if (current_topic and topic != current_topic) or (current_tokens + token_size > MAX_TOKENS): | |
| if current_group: | |
| merged_chunks.append(self._create_merged_chunk(current_group)) | |
| current_group = [chunk] | |
| current_tokens = token_size | |
| current_topic = topic | |
| continue | |
| current_group.append(chunk) | |
| current_tokens += token_size | |
| current_topic = topic | |
| if current_group: | |
| merged_chunks.append(self._create_merged_chunk(current_group)) | |
| return merged_chunks | |
| def _create_merged_chunk(self, group: list[dict]) -> ChunkMetadata: | |
| if len(group) == 1: | |
| return group[0] | |
| merged_text = "\n".join(cm.text for cm in group).strip() | |
| total_tokens = sum(cm.token_size for cm in group) | |
| first = group[0] | |
| merged_id = f"merged_{first.topic}_{group[0].chunk_id}_to_{group[-1].chunk_id}" | |
| merged_chunk = ChunkMetadata( | |
| chunk_id = merged_id, | |
| text = merged_text, | |
| source_url = first.source_url, | |
| program = first.program, | |
| language = first.language, | |
| topic = first.topic, | |
| last_scraped = first.last_scraped, | |
| page_title = first.page_title, | |
| section_heading = first.section_heading, | |
| token_size = total_tokens, | |
| original_chunk_ids = [c.chunk_id for c in group], | |
| ) | |
| return merged_chunk | |
| def _get_formatted_chunk_text(self, chunk, headings) -> str: | |
| formatted_text = f"{' '.join(headings)}\n" | |
| if not hasattr(chunk.meta, 'doc_items'): | |
| return formatted_text + chunk.text.replace('\n', ' ') | |
| labels = set() | |
| for item in chunk.meta.doc_items: | |
| labels.add(item.label) | |
| labels = [label for label in labels if label in ['table', 'list_item']] | |
| if labels: | |
| return formatted_text + chunk.text | |
| return formatted_text + chunk.text.replace('\n', ' ') | |
| def _merge_chunks_by_headings(self, raw_chunks: list) -> list[str]: | |
| """ | |
| Groups consecutive chunks that share the same parent headings and merges them into one clean chunk. | |
| """ | |
| prefix_level = 2 | |
| merged = [] | |
| i = 0 | |
| n = len(raw_chunks) | |
| while i < n: | |
| chunk = raw_chunks[i] | |
| headings = getattr(chunk.meta, "headings", []) or [] | |
| if len(headings) < prefix_level: | |
| formatted_text = self._get_formatted_chunk_text(chunk, headings) | |
| merged.append(formatted_text) | |
| i += 1 | |
| continue | |
| # Start a new group with this prefix | |
| common_prefix = "\n".join(headings[:prefix_level]) | |
| group = [] | |
| while i < n: | |
| curr_chunk = raw_chunks[i] | |
| curr_headings = getattr(curr_chunk.meta, "headings", []) or [] | |
| curr_prefix = "\n".join(curr_headings[:prefix_level]) | |
| if curr_prefix != common_prefix: | |
| break | |
| leaf_heading = curr_headings[-1] if len(curr_headings) > prefix_level else "" | |
| content = curr_chunk.text.replace('\n', ' ').strip() | |
| if leaf_heading and content: | |
| group.append(f"{leaf_heading}: {content}") | |
| elif content: | |
| group.append(content) | |
| i += 1 | |
| # Build the final merged chunk | |
| if len(group) > 1: | |
| full_chunk = f"{'\n'.join(headings[1:-1])}\n{'\n'.join(group)}" | |
| else: | |
| full_chunk = f"{'\n'.join(headings[1:])}\n{chunk.text}" | |
| merged.append(full_chunk.strip()) | |
| return merged | |