Spaces:
Sleeping
Sleeping
File size: 5,597 Bytes
698965e | 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 | import json, os
from typing import Counter
from bs4 import BeautifulSoup
from docling_core.types.doc.document import DoclingDocument
from ..const.cc_whitelist import REPETITION_WHITELIST
from ..utils.logging import get_logger
from ..config import config
logger = get_logger('scraper.cleaning')
class ContentCleaner:
def __init__(self, full_scraping) -> None:
self._repetitions_counter: Counter = Counter()
self._repetitive_content: list[str] = []
self.full_scraping: bool = full_scraping
def clean_mobile_content(self, html: str) -> str:
soup = BeautifulSoup(html, 'html')
for element in soup.find_all(class_='show-sm'):
element.decompose()
return str(soup)
def extract_urls(self, document: DoclingDocument) -> list[str]:
discovered_urls = []
for node, _ in document.iterate_items(root=document.body, with_groups=False):
if hasattr(node, 'hyperlink') and node.hyperlink:
discovered_urls.append(str(node.hyperlink))
return discovered_urls
def collect_repetitive_content(self, document: DoclingDocument) -> None:
content_in_document = set()
for node, _ in document.iterate_items(root=document.body, with_groups=False):
if hasattr(node, 'text') and node.text:
stripped_text = node.text.strip().lower()
content_in_document.add(stripped_text)
for content in content_in_document:
self._repetitions_counter[content] += 1
def perform_content_analysis(self,target_url: str = "index", url_filename: str = 'index', ) -> None:
target_url_filename = url_filename + '-content_analysis.json'
target_url_path = os.path.join(config.paths.SCRAPING_OUTPUT, target_url_filename)
if not self.full_scraping and os.path.exists(target_url_path):
with open(target_url_path, 'r') as f:
content_analysis = json.load(f)
self._repetitive_content = content_analysis['repetitive_content']
else:
self._repetitive_content = [{'content': text, 'amount': count}
for text, count in self._repetitions_counter.items()
if text not in REPETITION_WHITELIST and count > 1]
logger.info(f"Content analysis for target URL '{target_url}' " +
f"yielded {len(self._repetitive_content)} repetitive text lines")
content_analysis = {
'target_url': target_url,
'repetitive_content': self._repetitive_content,
}
with open(target_url_path, 'w') as f:
json.dump(content_analysis, f, indent=4)
logger.info(f"Saved content analysis results under '{target_url_path}'")
self._repetitive_content = [rc['content'] for rc in self._repetitive_content]
def clean_document(self, document: DoclingDocument) -> None:
document.furniture.children.clear()
# Step 1: Shallow tagging of useless content
texts_to_remove = set()
nodes_to_remove = []
for node, _ in document.iterate_items(root=document.body, with_groups=False):
if hasattr(node, 'text') and node.text:
stripped_text = node.text.strip().lower()
if stripped_text in self._repetitive_content:
nodes_to_remove.append(node)
continue
if hasattr(node, 'captions') and node.captions:
caption_text = node.caption_text(document).strip()
if len(caption_text) < 50:
nodes_to_remove.append(node)
if caption_text not in self._repetitive_content:
texts_to_remove.add(caption_text)
continue
if hasattr(node, 'hyperlink') and node.hyperlink:
nodes_to_remove.append(node)
if node.text:
texts_to_remove.add(node.text)
continue
# Step 2: Removal of duplicates from other node types
for node, _ in document.iterate_items(root=document.body, with_groups=False):
if hasattr(node, 'text') and node.text:
stripped_text = node.text.strip().lower()
if stripped_text in texts_to_remove:
nodes_to_remove.append(node)
continue
# Step 3: Deletion of all useless nodes
for node in nodes_to_remove:
if not (hasattr(node, 'parent') and node.parent):
continue
parent_node = node.parent.resolve(document)
node_ref = node.get_ref()
if node_ref not in parent_node.children:
continue
node_children_refs = list(node.children) if hasattr(node, 'children') else []
idx = parent_node.children.index(node_ref)
parent_node.children.pop(idx)
parent_node.children[idx:idx] = node_children_refs
# Promote children of removed node to node's parent
for child_ref in node_children_refs:
child_node = child_ref.resolve(document)
if hasattr(child_node, 'parent'):
child_node.parent = node.parent
# Clean node references
if hasattr(node, 'children'):
node.children.clear()
if hasattr(node, 'parent'):
node.parent = None
|