File size: 5,597 Bytes
698965e
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
import json, os

from typing import Counter
from bs4 import BeautifulSoup
from docling_core.types.doc.document import DoclingDocument

from ..const.cc_whitelist import REPETITION_WHITELIST
from ..utils.logging import get_logger
from ..config import config

logger = get_logger('scraper.cleaning')

class ContentCleaner:
    def __init__(self, full_scraping) -> None:
        self._repetitions_counter: Counter = Counter()
        self._repetitive_content:  list[str] = []
        self.full_scraping: bool = full_scraping
    
    
    def clean_mobile_content(self, html: str) -> str:
        soup = BeautifulSoup(html, 'html')
        for element in soup.find_all(class_='show-sm'):
            element.decompose()

        return str(soup)


    def extract_urls(self, document: DoclingDocument) -> list[str]:
        discovered_urls = []
        for node, _ in document.iterate_items(root=document.body, with_groups=False):
            if hasattr(node, 'hyperlink') and node.hyperlink:
                discovered_urls.append(str(node.hyperlink))
        
        return discovered_urls


    def collect_repetitive_content(self, document: DoclingDocument) -> None:
        content_in_document = set()
        for node, _ in document.iterate_items(root=document.body, with_groups=False):
            if hasattr(node, 'text') and node.text:
                stripped_text = node.text.strip().lower()
                content_in_document.add(stripped_text)
        
        for content in content_in_document:
            self._repetitions_counter[content] += 1   
    

    def perform_content_analysis(self,target_url: str = "index", url_filename: str = 'index', ) -> None:
        target_url_filename = url_filename + '-content_analysis.json'
        target_url_path = os.path.join(config.paths.SCRAPING_OUTPUT, target_url_filename)

        if not self.full_scraping and os.path.exists(target_url_path):
            with open(target_url_path, 'r') as f:
                content_analysis = json.load(f)
            self._repetitive_content = content_analysis['repetitive_content']
        else:
            self._repetitive_content = [{'content': text, 'amount': count}
                for text, count in self._repetitions_counter.items()
                    if text not in REPETITION_WHITELIST and count > 1]
            logger.info(f"Content analysis for target URL '{target_url}' " +
                        f"yielded {len(self._repetitive_content)} repetitive text lines")

            content_analysis = {
                'target_url': target_url,
                'repetitive_content': self._repetitive_content,
            }

            with open(target_url_path, 'w') as f:
                json.dump(content_analysis, f, indent=4)
            logger.info(f"Saved content analysis results under '{target_url_path}'")

            self._repetitive_content = [rc['content'] for rc in self._repetitive_content]


    def clean_document(self, document: DoclingDocument) -> None:
        document.furniture.children.clear()
        
        # Step 1: Shallow tagging of useless content
        texts_to_remove = set()
        nodes_to_remove = []
        for node, _ in document.iterate_items(root=document.body, with_groups=False):            
            if hasattr(node, 'text') and node.text:
                stripped_text = node.text.strip().lower()
                if stripped_text in self._repetitive_content:
                    nodes_to_remove.append(node)
                    continue
            if hasattr(node, 'captions') and node.captions:
                caption_text = node.caption_text(document).strip()
                if len(caption_text) < 50:
                    nodes_to_remove.append(node)
                    if caption_text not in self._repetitive_content:
                        texts_to_remove.add(caption_text)
                    continue
            if hasattr(node, 'hyperlink') and node.hyperlink:
                nodes_to_remove.append(node)
                if node.text:
                    texts_to_remove.add(node.text)
                continue
        
        # Step 2: Removal of duplicates from other node types
        for node, _ in document.iterate_items(root=document.body, with_groups=False):
            if hasattr(node, 'text') and node.text:
                stripped_text = node.text.strip().lower()
                if stripped_text in texts_to_remove:
                    nodes_to_remove.append(node)
                    continue

        # Step 3: Deletion of all useless nodes
        for node in nodes_to_remove:
            if not (hasattr(node, 'parent') and node.parent): 
                continue
            
            parent_node = node.parent.resolve(document)
            node_ref = node.get_ref()
            if node_ref not in parent_node.children:
                continue
            
            node_children_refs = list(node.children) if hasattr(node, 'children') else []
            idx = parent_node.children.index(node_ref)
            parent_node.children.pop(idx)
            parent_node.children[idx:idx] = node_children_refs
            
            # Promote children of removed node to node's parent
            for child_ref in node_children_refs:
                child_node = child_ref.resolve(document)
                if hasattr(child_node, 'parent'):
                    child_node.parent = node.parent 
            
            # Clean node references
            if hasattr(node, 'children'):
                node.children.clear()
            if hasattr(node, 'parent'):
                node.parent = None