File size: 14,947 Bytes
0a4529c
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
# DEPENDENCIES
import re
import html
import unicodedata
from typing import Optional, List
from config.logging_config import get_logger

# Setup Logger
logger = get_logger(__name__)


class TextCleaner:
    """
    Comprehensive text cleaning and normalization: Preserves semantic meaning while removing noise
    """
    # Common patterns
    URL_PATTERN         = re.compile(r'http[s]?://(?:[a-zA-Z]|[0-9]|[$-_@.&+]|[!*\\(\\),]|(?:%[0-9a-fA-F][0-9a-fA-F]))+')
    EMAIL_PATTERN       = re.compile(r'\b[A-Za-z0-9._%+-]+@[A-Za-z0-9.-]+\.[A-Z|a-z]{2,}\b')
    PHONE_PATTERN       = re.compile(r'(\+\d{1,3}[-.\s]?)?(\(?\d{3}\)?[-.\s]?)?\d{3}[-.\s]?\d{4}')
    MULTIPLE_SPACES     = re.compile(r'\s+')
    MULTIPLE_NEWLINES   = re.compile(r'\n\s*\n\s*\n+')
    
    # HTML/XML patterns
    HTML_TAG_PATTERN    = re.compile(r'<[^>]+>')
    HTML_ENTITY_PATTERN = re.compile(r'&[a-zA-Z]+;|&#\d+;')
    
    # Special characters
    BULLET_POINTS       = ['•', '◦', '▪', '▫', '⬩', '▹', '▸', '►', '▻', '→']
    QUOTATION_MARKS     = ['"', '"', ''', ''', '«', '»', '‹', '›']
    

    @classmethod
    def clean(cls, text: str, remove_urls: bool = False, remove_emails: bool = False, remove_phone_numbers: bool = False, remove_html: bool = True,
              normalize_whitespace: bool = True, normalize_quotes: bool = True, normalize_bullets: bool = True, lowercase: bool = False, 
              remove_extra_newlines: bool = True, preserve_structure: bool = True) -> str:
        """
        Clean text with configurable options
        
        Arguments:
        ----------
            text                  { str }  : Input text
            
            remove_urls           { bool } : Remove URLs
            
            remove_emails         { bool } : Remove email addresses
            
            remove_phone_numbers  { bool } : Remove phone numbers
            
            remove_html           { bool } : Remove HTML tags
            
            normalize_whitespace  { bool } : Normalize spaces/tabs
            
            normalize_quotes      { bool } : Convert fancy quotes to standard
            
            normalize_bullets     { bool } : Convert bullet points to standard
            
            lowercase             { bool } : Convert to lowercase
            
            remove_extra_newlines { bool } : Remove excessive blank lines
            
            preserve_structure    { bool } : Try to maintain document structure
        
        Returns:
        --------
                        { str }            : Cleaned text
        """
        if not text or not text.strip():
            return ""
        
        # Original length for logging
        original_length = len(text)
        
        # Remove HTML if present
        if remove_html:
            text = cls.remove_html_tags(text)
            text = cls.decode_html_entities(text)
        
        # Remove specific patterns
        if remove_urls:
            text = cls.URL_PATTERN.sub(' ', text)
        
        if remove_emails:
            text = cls.EMAIL_PATTERN.sub(' ', text)
        
        if remove_phone_numbers:
            text = cls.PHONE_PATTERN.sub(' ', text)
        
        # Normalize unicode
        text = cls.normalize_unicode(text)
        
        # Normalize quotes
        if normalize_quotes:
            text = cls.normalize_quotation_marks(text)
        
        # Normalize bullets
        if normalize_bullets:
            text = cls.normalize_bullet_points(text)
        
        # Handle whitespace
        if normalize_whitespace:
            # Replace tabs with spaces
            text = text.replace('\t', '    ')
            
            # Normalize spaces (but not newlines if preserving structure)
            if preserve_structure:
                lines = text.split('\n')
                lines = [cls.MULTIPLE_SPACES.sub(' ', line) for line in lines]
                text  = '\n'.join(lines)
           
            else:
                text = cls.MULTIPLE_SPACES.sub(' ', text)
        
        # Remove extra newlines
        if remove_extra_newlines:
            text = cls.MULTIPLE_NEWLINES.sub('\n\n', text)
        
        # Lowercase if requested
        if lowercase:
            text = text.lower()
        
        # Final cleanup
        text           = text.strip()
        
        # Log cleaning stats
        cleaned_length = len(text)
        reduction      = ((original_length - cleaned_length) / original_length * 100) if (original_length > 0) else 0

        logger.debug(f"Text cleaned: {original_length} -> {cleaned_length} chars ({reduction:.1f}% reduction)")
        
        return text
    

    @classmethod
    def remove_html_tags(cls, text: str) -> str:
        """
        Remove HTML tags
        """
        return cls.HTML_TAG_PATTERN.sub('', text)
    

    @classmethod
    def decode_html_entities(cls, text: str) -> str:
        """
        Decode HTML entities
        """
        return html.unescape(text)
    

    @classmethod
    def normalize_unicode(cls, text: str) -> str:
        """
        Normalize unicode characters : Converts to NFC form (canonical composition)
        """
        return unicodedata.normalize('NFC', text)
    

    @classmethod
    def normalize_quotation_marks(cls, text: str) -> str:
        """
        Convert fancy quotes to standard ASCII quotes
        """
        for fancy_quote in cls.QUOTATION_MARKS:
            if (fancy_quote in ['"', '"', '«', '»']):
                text = text.replace(fancy_quote, '"')
            
            elif (fancy_quote in [''', ''', '‹', '›']):
                text = text.replace(fancy_quote, "'")

        return text
    

    @classmethod
    def normalize_bullet_points(cls, text: str) -> str:
        """
        Convert various bullet points to standard bullet
        """
        for bullet in cls.BULLET_POINTS:
            text = text.replace(bullet, '•')
        
        return text
    

    @classmethod
    def remove_boilerplate(cls, text: str, remove_headers: bool = True, remove_footers: bool = True, remove_page_numbers: bool = True) -> str:
        """
        Remove common boilerplate text
        
        Arguments:
        ----------
            text                 { str } : Input text
            
            remove_headers      { bool } : Remove common header patterns
            
            remove_footers      { bool } : Remove common footer patterns
            
            remove_page_numbers { bool } : Remove standalone page numbers
        
        Returns:
        --------
                    { str }              : Text without boilerplate
        """
        lines         = text.split('\n')
        cleaned_lines = list()
        
        for line in lines:
            line_stripped = line.strip()
            
            # Skip empty lines
            if not line_stripped:
                cleaned_lines.append(line)
                continue
            
            # Remove page numbers (lines that are just numbers)
            if remove_page_numbers and line_stripped.isdigit():
                continue
            
            # Remove common header patterns
            if remove_headers:
                header_patterns = [r'^Page \d+ of \d+$', r'^\d+/\d+$', r'^Header:', r'^Draft', r'^Confidential']
                
                if (any(re.match(pattern, line_stripped, re.IGNORECASE) for pattern in header_patterns)):
                    continue
            
            # Remove common footer patterns
            if remove_footers:
                footer_patterns = [r'^Copyright ©', r'^All rights reserved', r'^Footer:', r'^\d{4} .+ Inc\.']
                
                if any(re.match(pattern, line_stripped, re.IGNORECASE) for pattern in footer_patterns):
                    continue
            
            cleaned_lines.append(line)
        
        return '\n'.join(cleaned_lines)
    

    @classmethod
    def extract_sentences(cls, text: str) -> List[str]:
        """
        Split text into sentences : Handles common abbreviations and edge cases
        
        Arguments:
        ----------
            text { str } : Input text
        
        Returns:
        --------
            { list }     : List of sentences
        """
        # Common abbreviations that shouldn't trigger sentence breaks
        abbreviations    = {'Dr.', 'Mr.', 'Mrs.', 'Ms.', 'Jr.', 'Sr.', 'Prof.', 'Inc.', 'Ltd.', 'Corp.', 'Co.', 'vs.', 'etc.', 'e.g.', 'i.e.', 'Ph.D.', 'M.D.', 'B.A.', 'M.A.', 'U.S.', 'U.K.'}
        
        # Protect abbreviations
        protected_text = text

        for abbr in abbreviations:
            protected_text = protected_text.replace(abbr, abbr.replace('.', '<DOT>'))
        
        # Split on sentence boundaries
        sentence_pattern = r'(?<=[.!?])\s+(?=[A-Z])'
        sentences        = re.split(sentence_pattern, protected_text)
        
        # Restore abbreviations
        sentences        = [s.replace('<DOT>', '.') for s in sentences]
        
        # Clean and filter
        sentences        = [s.strip() for s in sentences if s.strip()]
        
        return sentences
    

    @classmethod
    def truncate(cls, text: str, max_length: int, suffix: str = "...", word_boundary: bool = True) -> str:
        """
        Truncate text to maximum length
        
        Arguments:
        ----------
            text          { str }  : Input text

            max_length    { int }  : Maximum length
            
            suffix        { str }  : Suffix to append when truncated
            
            word_boundary { bool } : Truncate at word boundary
        
        Returns:
        --------
                  { str }          : Truncated text
        """
        if (len(text) <= max_length):
            return text
        
        # Account for suffix
        max_length -= len(suffix)
        
        if word_boundary:
            # Find last space before max_length
            truncated  = text[:max_length]
            last_space = truncated.rfind(' ')
            
            if (last_space > 0):
                truncated = truncated[:last_space]
        
        else:
            truncated = text[:max_length]
        
        return truncated + suffix
    

    @classmethod
    def remove_special_characters(cls, text: str, keep_punctuation: bool = True, keep_numbers: bool = True) -> str:
        """
        Remove special characters
        
        Arguments:
        ----------
            text             { str }  : Input text

            keep_punctuation { bool } : Keep basic punctuation
            
            keep_numbers     { bool } : Keep numbers
        
        Returns:
        --------
                    { str }           : Text with special characters removed
        """
        if keep_punctuation and keep_numbers:
            # Keep alphanumeric and basic punctuation
            pattern = r'[^a-zA-Z0-9\s.,!?;:\'-]'

        elif keep_punctuation:
            # Keep letters and punctuation
            pattern = r'[^a-zA-Z\s.,!?;:\'-]'

        elif keep_numbers:
            # Keep letters and numbers
            pattern = r'[^a-zA-Z0-9\s]'

        else:
            # Keep only letters
            pattern = r'[^a-zA-Z\s]'
        
        return re.sub(pattern, '', text)

    
    @classmethod
    def deduplicate_lines(cls, text: str, preserve_order: bool = True) -> str:
        """
        Remove duplicate lines
        
        Arguments:
        ----------
            text           { str }  : Input text

            preserve_order { bool } : Maintain original order
        
        Returns:
        --------
                  { str }           : Text with duplicate lines removed
        """
        lines = text.split('\n')
        
        if preserve_order:
            seen         = set()
            unique_lines = list()

            for line in lines:
                if line not in seen:
                    seen.add(line)
                    unique_lines.append(line)

        else:
            unique_lines = list(set(lines))
        
        return '\n'.join(unique_lines)

    
    @classmethod
    def count_tokens_estimate(cls, text: str) -> int:
        """
        Estimate token count: Rule of thumb is - ~4 characters per token for English.
        
        Arguments:
        ----------
            text { str } : Input text
        
        Returns:
        --------
            { int }      : Estimated token count
        """
        # More accurate estimation
        words         = text.split()
        chars         = len(text)
        
        # Average of word-based and char-based estimates
        word_estimate = len(words) * 1.3  # ~1.3 tokens per word

        # ~4 chars per token
        char_estimate = chars / 4  
        
        return int((word_estimate + char_estimate) / 2)
    

    @classmethod
    def preserve_structure_markers(cls, text: str) -> str:
        """
        Identify and mark structural elements: Useful for semantic chunking
        
        Arguments:
        ----------
            text { str } : Input text
        
        Returns:
        --------
             { str }     : Text with structure markers
        """
        lines        = text.split('\n')
        marked_lines = list()
        
        for line in lines:
            stripped = line.strip()
            
            # Mark headers (ALL CAPS, short lines)
            if (stripped.isupper() and (len(stripped) < 100)):
                marked_lines.append(f"[HEADER] {line}")
            
            # Mark list items
            elif re.match(r'^[\d•\-\*]\s', stripped):
                marked_lines.append(f"[LIST] {line}")

            # Regular text
            else:
                marked_lines.append(line)
        
        return '\n'.join(marked_lines)


def clean_for_rag(text: str) -> str:
    """
    Convenience function: clean text optimally for RAG
    
    Arguments:
    ----------
        text { str } : Input text
    
    Returns:
    --------
         { str }     : Cleaned text
    """
    return TextCleaner.clean(text,
                             remove_urls           = False,  # URLs might be useful context
                             remove_emails         = False,  # Emails might be useful
                             remove_phone_numbers  = False,  # Phone numbers might be useful
                             remove_html           = True,
                             normalize_whitespace  = True,
                             normalize_quotes      = True,
                             normalize_bullets     = True,
                             lowercase             = False,  # Keep original casing for proper nouns
                             remove_extra_newlines = True,
                             preserve_structure    = True,   # Important for chunking
                            )