File size: 7,081 Bytes
698965e
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
from docling.document_converter import InputFormat
from docling_core.types.doc.document import DoclingDocument, TitleItem

from .types import ChunkMetadata

from ..config import config
from ..pipeline.processors import ProcessorBase
from ..utils.logging import get_logger

logger = get_logger('scraper.processor')

class HTMLProcessor(ProcessorBase):
    def __init__(self) -> None:
        super().__init__()

    def process(self, url: str, html_content: str) -> DoclingDocument | None:
        if not html_content:
            logger.warning('Nothing to process, HTML body is empty!')
            return None

        logger.info(f"Analyzing page layout of URL '{url}'...")
        try:
            document = self._converter.convert_string(html_content, InputFormat.HTML).document
            document.name = url
            return document 
        except Exception as e:
            logger.error(f"Failed to analyze page layout: {e}")
            return None
    

    def prepare_chunks(self, url: str, url_text: str, metas: list[ChunkMetadata]) -> dict[str, list]:
        prepared_chunks = { lang: [] for lang in config.get('AVAILABLE_LANGUAGES', ['en', 'de']) }
        for meta in metas:
            prepared_chunks[meta.language].append(meta.text)
        for lang, chunks in prepared_chunks.items():
            prepared_chunks[lang] = self._prepare_chunks(url, url_text, chunks) 
 
        return prepared_chunks


    def extract_title(self, document: DoclingDocument) -> str:
        titles = [title.text for title in document.texts if isinstance(title, TitleItem)]
        return titles[0] if titles else 'No Title'


    def chunk(self, document: DoclingDocument) -> list[dict]:
        raw_chunks = list(self._chunker.chunk(document))
        chunks = self._merge_chunks_by_headings(raw_chunks) 

        prepared_chunks = [{
            'text': chunk,
            'title': chunk.split('\n')[0],
            'size': self._chunker.tokenizer.count_tokens(chunk)
        } for chunk in chunks]

        return prepared_chunks
    

    def merge_chunks_by_topic(self, chunk_metadatas: list[ChunkMetadata]) -> list[ChunkMetadata]:
        MAX_TOKENS = config.processing.MAX_TOKENS
        merged_chunks = []

        current_group  = []
        current_tokens = 0
        current_topic  = None

        for chunk in chunk_metadatas:
            topic      = chunk.topic
            token_size = chunk.token_size
            
            # If the chunk is already large enough, it will not be merged
            if token_size >= MAX_TOKENS:
                # Consequtive group is over when large chunk is met
                if current_group:
                    merged_chunks.append(self._create_merged_chunk(current_group))
                    current_group  = []
                    current_tokens = 0
                    current_topic  = None
                
                # Large chunk is appended here
                merged_chunks.append(chunk)
                continue

            if (current_topic and topic != current_topic) or (current_tokens + token_size > MAX_TOKENS):
                if current_group:
                    merged_chunks.append(self._create_merged_chunk(current_group))
                
                current_group  = [chunk]
                current_tokens = token_size
                current_topic  = topic
                continue

            current_group.append(chunk)
            current_tokens += token_size
            current_topic   = topic
        

        if current_group:
            merged_chunks.append(self._create_merged_chunk(current_group))
 
        return merged_chunks


    def _create_merged_chunk(self, group: list[dict]) -> ChunkMetadata:
        if len(group) == 1:
            return group[0]

        merged_text  = "\n".join(cm.text for cm in group).strip()
        total_tokens = sum(cm.token_size for cm in group)

        first = group[0]

        merged_id = f"merged_{first.topic}_{group[0].chunk_id}_to_{group[-1].chunk_id}"
        merged_chunk = ChunkMetadata(
            chunk_id           = merged_id,
            text               = merged_text,
            source_url         = first.source_url,
            program            = first.program,
            language           = first.language,
            topic              = first.topic,
            last_scraped       = first.last_scraped,
            page_title         = first.page_title,
            section_heading    = first.section_heading,
            token_size         = total_tokens,
            original_chunk_ids = [c.chunk_id for c in group],  
        )
        return merged_chunk


    def _get_formatted_chunk_text(self, chunk, headings) -> str: 
        formatted_text = f"{' '.join(headings)}\n"

        if not hasattr(chunk.meta, 'doc_items'):
            return formatted_text + chunk.text.replace('\n', ' ')

        labels = set()       
        for item in chunk.meta.doc_items:
            labels.add(item.label)
        
        labels = [label for label in labels if label in ['table', 'list_item']]
        if labels:
            return formatted_text + chunk.text

        return formatted_text + chunk.text.replace('\n', ' ')


    def _merge_chunks_by_headings(self, raw_chunks: list) -> list[str]:
        """
        Groups consecutive chunks that share the same parent headings and merges them into one clean chunk.
        """
        prefix_level = 2
        merged = []
        i = 0
        n = len(raw_chunks)
        
        while i < n:
            chunk = raw_chunks[i]
            headings = getattr(chunk.meta, "headings", []) or []

            if len(headings) < prefix_level:
                formatted_text = self._get_formatted_chunk_text(chunk, headings) 
                merged.append(formatted_text)
                i += 1
                continue
            
            # Start a new group with this prefix
            common_prefix = "\n".join(headings[:prefix_level])
            group = []
            
            while i < n:
                curr_chunk = raw_chunks[i]
                curr_headings = getattr(curr_chunk.meta, "headings", []) or []
                curr_prefix = "\n".join(curr_headings[:prefix_level])
                
                if curr_prefix != common_prefix:
                    break 
                
                leaf_heading = curr_headings[-1] if len(curr_headings) > prefix_level else ""
                content = curr_chunk.text.replace('\n', ' ').strip()
                
                if leaf_heading and content:
                    group.append(f"{leaf_heading}: {content}")
                elif content:
                    group.append(content)
                
                i += 1
            
            # Build the final merged chunk
            if len(group) > 1:
                full_chunk = f"{'\n'.join(headings[1:-1])}\n{'\n'.join(group)}"
            else:
                full_chunk = f"{'\n'.join(headings[1:])}\n{chunk.text}"

            merged.append(full_chunk.strip())
                
        return merged