hsg_rag_eea / src /scraping /content_cleaner.py
Pygmales
synched versions
698965e
raw
history blame
5.6 kB
import json, os
from typing import Counter
from bs4 import BeautifulSoup
from docling_core.types.doc.document import DoclingDocument
from ..const.cc_whitelist import REPETITION_WHITELIST
from ..utils.logging import get_logger
from ..config import config
logger = get_logger('scraper.cleaning')
class ContentCleaner:
def __init__(self, full_scraping) -> None:
self._repetitions_counter: Counter = Counter()
self._repetitive_content: list[str] = []
self.full_scraping: bool = full_scraping
def clean_mobile_content(self, html: str) -> str:
soup = BeautifulSoup(html, 'html')
for element in soup.find_all(class_='show-sm'):
element.decompose()
return str(soup)
def extract_urls(self, document: DoclingDocument) -> list[str]:
discovered_urls = []
for node, _ in document.iterate_items(root=document.body, with_groups=False):
if hasattr(node, 'hyperlink') and node.hyperlink:
discovered_urls.append(str(node.hyperlink))
return discovered_urls
def collect_repetitive_content(self, document: DoclingDocument) -> None:
content_in_document = set()
for node, _ in document.iterate_items(root=document.body, with_groups=False):
if hasattr(node, 'text') and node.text:
stripped_text = node.text.strip().lower()
content_in_document.add(stripped_text)
for content in content_in_document:
self._repetitions_counter[content] += 1
def perform_content_analysis(self,target_url: str = "index", url_filename: str = 'index', ) -> None:
target_url_filename = url_filename + '-content_analysis.json'
target_url_path = os.path.join(config.paths.SCRAPING_OUTPUT, target_url_filename)
if not self.full_scraping and os.path.exists(target_url_path):
with open(target_url_path, 'r') as f:
content_analysis = json.load(f)
self._repetitive_content = content_analysis['repetitive_content']
else:
self._repetitive_content = [{'content': text, 'amount': count}
for text, count in self._repetitions_counter.items()
if text not in REPETITION_WHITELIST and count > 1]
logger.info(f"Content analysis for target URL '{target_url}' " +
f"yielded {len(self._repetitive_content)} repetitive text lines")
content_analysis = {
'target_url': target_url,
'repetitive_content': self._repetitive_content,
}
with open(target_url_path, 'w') as f:
json.dump(content_analysis, f, indent=4)
logger.info(f"Saved content analysis results under '{target_url_path}'")
self._repetitive_content = [rc['content'] for rc in self._repetitive_content]
def clean_document(self, document: DoclingDocument) -> None:
document.furniture.children.clear()
# Step 1: Shallow tagging of useless content
texts_to_remove = set()
nodes_to_remove = []
for node, _ in document.iterate_items(root=document.body, with_groups=False):
if hasattr(node, 'text') and node.text:
stripped_text = node.text.strip().lower()
if stripped_text in self._repetitive_content:
nodes_to_remove.append(node)
continue
if hasattr(node, 'captions') and node.captions:
caption_text = node.caption_text(document).strip()
if len(caption_text) < 50:
nodes_to_remove.append(node)
if caption_text not in self._repetitive_content:
texts_to_remove.add(caption_text)
continue
if hasattr(node, 'hyperlink') and node.hyperlink:
nodes_to_remove.append(node)
if node.text:
texts_to_remove.add(node.text)
continue
# Step 2: Removal of duplicates from other node types
for node, _ in document.iterate_items(root=document.body, with_groups=False):
if hasattr(node, 'text') and node.text:
stripped_text = node.text.strip().lower()
if stripped_text in texts_to_remove:
nodes_to_remove.append(node)
continue
# Step 3: Deletion of all useless nodes
for node in nodes_to_remove:
if not (hasattr(node, 'parent') and node.parent):
continue
parent_node = node.parent.resolve(document)
node_ref = node.get_ref()
if node_ref not in parent_node.children:
continue
node_children_refs = list(node.children) if hasattr(node, 'children') else []
idx = parent_node.children.index(node_ref)
parent_node.children.pop(idx)
parent_node.children[idx:idx] = node_children_refs
# Promote children of removed node to node's parent
for child_ref in node_children_refs:
child_node = child_ref.resolve(document)
if hasattr(child_node, 'parent'):
child_node.parent = node.parent
# Clean node references
if hasattr(node, 'children'):
node.children.clear()
if hasattr(node, 'parent'):
node.parent = None