hsg_rag_eea / src /scraping /html_processor.py
Pygmales
synched versions
698965e
raw
history blame
7.08 kB
from docling.document_converter import InputFormat
from docling_core.types.doc.document import DoclingDocument, TitleItem
from .types import ChunkMetadata
from ..config import config
from ..pipeline.processors import ProcessorBase
from ..utils.logging import get_logger
logger = get_logger('scraper.processor')
class HTMLProcessor(ProcessorBase):
def __init__(self) -> None:
super().__init__()
def process(self, url: str, html_content: str) -> DoclingDocument | None:
if not html_content:
logger.warning('Nothing to process, HTML body is empty!')
return None
logger.info(f"Analyzing page layout of URL '{url}'...")
try:
document = self._converter.convert_string(html_content, InputFormat.HTML).document
document.name = url
return document
except Exception as e:
logger.error(f"Failed to analyze page layout: {e}")
return None
def prepare_chunks(self, url: str, url_text: str, metas: list[ChunkMetadata]) -> dict[str, list]:
prepared_chunks = { lang: [] for lang in config.get('AVAILABLE_LANGUAGES', ['en', 'de']) }
for meta in metas:
prepared_chunks[meta.language].append(meta.text)
for lang, chunks in prepared_chunks.items():
prepared_chunks[lang] = self._prepare_chunks(url, url_text, chunks)
return prepared_chunks
def extract_title(self, document: DoclingDocument) -> str:
titles = [title.text for title in document.texts if isinstance(title, TitleItem)]
return titles[0] if titles else 'No Title'
def chunk(self, document: DoclingDocument) -> list[dict]:
raw_chunks = list(self._chunker.chunk(document))
chunks = self._merge_chunks_by_headings(raw_chunks)
prepared_chunks = [{
'text': chunk,
'title': chunk.split('\n')[0],
'size': self._chunker.tokenizer.count_tokens(chunk)
} for chunk in chunks]
return prepared_chunks
def merge_chunks_by_topic(self, chunk_metadatas: list[ChunkMetadata]) -> list[ChunkMetadata]:
MAX_TOKENS = config.processing.MAX_TOKENS
merged_chunks = []
current_group = []
current_tokens = 0
current_topic = None
for chunk in chunk_metadatas:
topic = chunk.topic
token_size = chunk.token_size
# If the chunk is already large enough, it will not be merged
if token_size >= MAX_TOKENS:
# Consequtive group is over when large chunk is met
if current_group:
merged_chunks.append(self._create_merged_chunk(current_group))
current_group = []
current_tokens = 0
current_topic = None
# Large chunk is appended here
merged_chunks.append(chunk)
continue
if (current_topic and topic != current_topic) or (current_tokens + token_size > MAX_TOKENS):
if current_group:
merged_chunks.append(self._create_merged_chunk(current_group))
current_group = [chunk]
current_tokens = token_size
current_topic = topic
continue
current_group.append(chunk)
current_tokens += token_size
current_topic = topic
if current_group:
merged_chunks.append(self._create_merged_chunk(current_group))
return merged_chunks
def _create_merged_chunk(self, group: list[dict]) -> ChunkMetadata:
if len(group) == 1:
return group[0]
merged_text = "\n".join(cm.text for cm in group).strip()
total_tokens = sum(cm.token_size for cm in group)
first = group[0]
merged_id = f"merged_{first.topic}_{group[0].chunk_id}_to_{group[-1].chunk_id}"
merged_chunk = ChunkMetadata(
chunk_id = merged_id,
text = merged_text,
source_url = first.source_url,
program = first.program,
language = first.language,
topic = first.topic,
last_scraped = first.last_scraped,
page_title = first.page_title,
section_heading = first.section_heading,
token_size = total_tokens,
original_chunk_ids = [c.chunk_id for c in group],
)
return merged_chunk
def _get_formatted_chunk_text(self, chunk, headings) -> str:
formatted_text = f"{' '.join(headings)}\n"
if not hasattr(chunk.meta, 'doc_items'):
return formatted_text + chunk.text.replace('\n', ' ')
labels = set()
for item in chunk.meta.doc_items:
labels.add(item.label)
labels = [label for label in labels if label in ['table', 'list_item']]
if labels:
return formatted_text + chunk.text
return formatted_text + chunk.text.replace('\n', ' ')
def _merge_chunks_by_headings(self, raw_chunks: list) -> list[str]:
"""
Groups consecutive chunks that share the same parent headings and merges them into one clean chunk.
"""
prefix_level = 2
merged = []
i = 0
n = len(raw_chunks)
while i < n:
chunk = raw_chunks[i]
headings = getattr(chunk.meta, "headings", []) or []
if len(headings) < prefix_level:
formatted_text = self._get_formatted_chunk_text(chunk, headings)
merged.append(formatted_text)
i += 1
continue
# Start a new group with this prefix
common_prefix = "\n".join(headings[:prefix_level])
group = []
while i < n:
curr_chunk = raw_chunks[i]
curr_headings = getattr(curr_chunk.meta, "headings", []) or []
curr_prefix = "\n".join(curr_headings[:prefix_level])
if curr_prefix != common_prefix:
break
leaf_heading = curr_headings[-1] if len(curr_headings) > prefix_level else ""
content = curr_chunk.text.replace('\n', ' ').strip()
if leaf_heading and content:
group.append(f"{leaf_heading}: {content}")
elif content:
group.append(content)
i += 1
# Build the final merged chunk
if len(group) > 1:
full_chunk = f"{'\n'.join(headings[1:-1])}\n{'\n'.join(group)}"
else:
full_chunk = f"{'\n'.join(headings[1:])}\n{chunk.text}"
merged.append(full_chunk.strip())
return merged