File size: 14,854 Bytes
c0f31c1
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
"""

BaseChunker.py



An abstract base class defining the interface for document chunking strategies.

"""

import logging
from core.OCREnhancedPDFLoader import OCREnhancedPDFLoader
from core.TextPreprocessor import TextPreprocessor
import numpy as np
from abc import ABC, abstractmethod
from typing import List, Dict, Any, Optional, Union

import spacy
from langchain_core.documents import Document

# Import tiktoken at the module level
try:
    import tiktoken
    TIKTOKEN_AVAILABLE = True
except ImportError:
    TIKTOKEN_AVAILABLE = False
    logging.warning("tiktoken not installed. Some tokenization features will be limited. "
                  "Install with: pip install tiktoken")

logger = logging.getLogger(__name__)

class BaseChunker(ABC):
    """Abstract base class for document chunking strategies."""
    
    # Common constants
    BLANK_THRESHOLD = 20  # Minimum characters for non-blank text
    TOKEN_THRESHOLD = 10  # Minimum tokens for valid content
    
    # Model type indicators
    TIKTOKEN_MODELS = ["gpt", "davinci", "curie", "babbage", "ada"]
    BASIC_TOKENIZER_MODELS = ["llama", "mistral", "granite"]
    
    def __init__(self, model_name: Optional[str] = None, embedding_model: Optional[Any] = None):
        """

        Initialize base chunker with model settings.

        

        Args:

            model_name: Name of the model for tokenization

            embedding_model: Model for generating embeddings

        """
        self.model_name = model_name
        self.embedding_model = embedding_model
        self.uses_tiktoken = False
        self.uses_basic_tokenizer = False
        self.tokenizer = None
        self._initialize_tokenizer()
        
        # Initialize NLP pipeline for text analysis
        self.nlp = spacy.load("en_core_web_sm")

    def _initialize_tokenizer(self):
        """Initialize the appropriate tokenizer based on model name."""
        if not self.model_name:
            logger.warning("No model name provided. Using basic tokenization.")
            self.uses_basic_tokenizer = True
            return
        
        # Check if model is supported by tiktoken
        if TIKTOKEN_AVAILABLE and self.model_name in ["cl100k_base", "p50k_base", "r50k_base", "gpt2"]:
            try:
                encoding = tiktoken.get_encoding(self.model_name)
                
                # Create a tokenizer-like interface for tiktoken
                class TiktokenWrapper:
                    def __init__(self, encoding):
                        self.encoding = encoding
                        
                    def tokenize(self, text):
                        return self.encoding.encode(text)
                
                self.tokenizer = TiktokenWrapper(encoding)
                self.uses_tiktoken = True
                logger.info(f"Initialized tiktoken tokenizer for model: {self.model_name}")
                return
            except Exception as e:
                logger.warning(f"Error with specified tiktoken model: {e}")
                # Fall back to a standard encoding
                try:
                    encoding = tiktoken.get_encoding("cl100k_base")
                    
                    class TiktokenWrapper:
                        def __init__(self, encoding):
                            self.encoding = encoding
                            
                        def tokenize(self, text):
                            return self.encoding.encode(text)
                    
                    self.tokenizer = TiktokenWrapper(encoding)
                    self.uses_tiktoken = True
                    logger.info("Initialized tiktoken with cl100k_base encoding")
                except Exception as e:
                    logger.warning(f"Error initializing tiktoken: {e}")
                    self.uses_basic_tokenizer = True

        if TIKTOKEN_AVAILABLE and (
            any(model in self.model_name.lower() for model in self.TIKTOKEN_MODELS) or 
            self.model_name.startswith("gpt-") or 
            self.model_name.endswith("-base")
        ):
            try:
                encoding = tiktoken.get_encoding(self.model_name)
                
                # Create a tokenizer-like interface for tiktoken
                class TiktokenWrapper:
                    def __init__(self, encoding):
                        self.encoding = encoding
                        
                    def tokenize(self, text):
                        return self.encoding.encode(text)
                
                self.tokenizer = TiktokenWrapper(encoding)
                self.uses_tiktoken = True
                logger.info(f"Initialized tiktoken tokenizer for model: {self.model_name}")
            except Exception as e:
                logger.warning(f"Error with specified tiktoken model: {e}")
                # Fall back to a standard encoding
                try:
                    encoding = tiktoken.get_encoding("cl100k_base")
                    
                    class TiktokenWrapper:
                        def __init__(self, encoding):
                            self.encoding = encoding
                            
                        def tokenize(self, text):
                            return self.encoding.encode(text)
                    
                    self.tokenizer = TiktokenWrapper(encoding)
                    self.uses_tiktoken = True
                    logger.info("Initialized tiktoken with cl100k_base encoding")
                except Exception as e:
                    logger.warning(f"Error initializing tiktoken: {e}")
                    self.uses_basic_tokenizer = True

        # Check if model uses basic tokenization
        elif any(model in self.model_name.lower() for model in self.BASIC_TOKENIZER_MODELS):
            self.uses_basic_tokenizer = True
            logger.info("Using basic tokenization for model")
            
        # Fall back to transformers tokenizer
        else:
            try:
                from transformers import AutoTokenizer
                self.tokenizer = AutoTokenizer.from_pretrained(self.model_name)
                logger.info(f"Initialized transformers tokenizer for model: {self.model_name}")
            except Exception as e:
                logger.warning(f"Error initializing transformer tokenizer: {e}")
                logger.warning("Falling back to basic tokenization")
                self.uses_basic_tokenizer = True
                
    def count_tokens(self, text: str) -> int:
        """Count tokens in a text string using the available tokenizer."""
        if not text:
            return 0
            
        try:
            # Try with the standard tokenizer
            if self.tokenizer:
                if self.uses_tiktoken:
                    # For tiktoken wrapper
                    return len(self.tokenizer.tokenize(text))
                else:
                    # For transformers tokenizer
                    tokens = self.tokenizer.tokenize(text)
                    return len(tokens)
        except Exception as e:
            logger.warning(f"Primary tokenization failed: {e}")
        
        # Basic tokenization fallback
        if self.uses_basic_tokenizer or not self.tokenizer:
            # Simple approximation (word count)
            return len(text.split())
            
        # If we somehow got here, return a reasonable approximation
        return len(text) // 4  # Rough character-to-token ratio
    
    def get_embedding(self, text: str) -> Optional[np.ndarray]:
        """Generate embedding vector for text."""
        if not text.strip() or not self.embedding_model:
            return None
            
        try:
            return self.embedding_model.encode(text)
        except Exception as e:
            logger.error(f"Error generating embedding: {e}")
            return None
    
    def analyze_text(self, text: str) -> Dict[str, Any]:
        """Perform detailed analysis of text content."""
        if not text.strip():
            return {
                "char_count": 0,
                "token_count": 0,
                "sentence_count": 0,
                "word_count": 0,
                "embedding_dim": 0,
                "has_content": False
            }
            
        try:
            embedding = self.get_embedding(text)
            doc = self.nlp(text)
            
            return {
                "char_count": len(text),
                "token_count": self.count_tokens(text),
                "sentence_count": len(list(doc.sents)),
                "word_count": len(text.split()),
                "embedding_dim": len(embedding) if embedding is not None else 0,
                "has_content": bool(text.strip())
            }
            
        except Exception as e:
            logger.error(f"Error analyzing text: {e}")
            return {
                "char_count": len(text),
                "token_count": 0,
                "sentence_count": 0,
                "word_count": len(text.split()),
                "embedding_dim": 0,
                "has_content": bool(text.strip())
            }
    
    def is_content_valid(self, text: str, min_chars: int = None, min_tokens: int = None) -> bool:
        """Check if content meets minimum requirements."""
        if not text.strip():
            return False
            
        min_chars = min_chars or self.BLANK_THRESHOLD
        min_tokens = min_tokens or self.TOKEN_THRESHOLD
        
        if len(text.strip()) < min_chars:
            return False
            
        token_count = self.count_tokens(text)
        return token_count >= min_tokens
    
    def validate_documents(self, documents):
        """Validate documents before sending to vector database"""
        valid_documents = []
        
        for i, doc in enumerate(documents):
            # Check if document content is empty or just whitespace
            if not doc.page_content or not doc.page_content.strip():
                print(f"Skipping document {i}: Empty content")
                continue
                
            # Check if content starts with invalid characters
            if doc.page_content and len(doc.page_content) > 0:
                # Remove any potential BOM or invisible characters at start
                cleaned_content = doc.page_content.lstrip('\ufeff\u200b\u200c\u200d\u200e\u200f\u2060')
                
                # Replace document content with cleaned version
                doc.page_content = cleaned_content
                
            valid_documents.append(doc)
        
        print(f"Validated {len(valid_documents)} of {len(documents)} documents")
        return valid_documents

    
    def debug_documents(self, documents, num_chars=50):
        """Print diagnostic information about documents"""
        print(f"\nDEBUG INFO: Examining {len(documents)} documents")
        
        for i, doc in enumerate(documents):
            content = doc.page_content
            if not content:
                print(f"  Doc {i}: EMPTY CONTENT")
                continue
                
            # Get first few characters and their ASCII/Unicode codes
            first_chars = content[:num_chars]
            char_codes = [f"{c}({ord(c)})" for c in first_chars[:10]]
            
            print(f"  Doc {i}: Length={len(content)}, First chars: {''.join(char_codes)}")
            print(f"    Preview: {first_chars!r}")
            
        print("DEBUG INFO END\n")
    

    def load_document(self, file_path: str) -> List[Document]:
        """Load document using OCREnhancedPDFLoader."""
        try:
            loader = OCREnhancedPDFLoader(file_path)
            documents = loader.load()
            self.debug_documents(documents)
            cleaned_docs = self.validate_documents(documents)
            return cleaned_docs
        except Exception as e:
            logger.error(f"Error loading document: {e}")
            raise
    
    def preprocess_text(self, text: str, remove_headers_footers: bool = True) -> str:
        """Preprocess text using TextPreprocessor."""
        try:
            preprocessor = TextPreprocessor()
            return preprocessor.preprocess(text, remove_headers_footers)
        except Exception as e:
            logger.error(f"Error preprocessing text: {e}")
            return text
    
    @abstractmethod
    def process_document(self, file_path: str, preprocess: bool = True) -> Union[List[Document], Dict[str, List[Document]]]:
        """Process document using specific chunking strategy."""
        pass

    def load_text_file(self, file_path: str) -> str:
        """

        Load raw text file content.

        

        Args:

            file_path: Path to the text file

            

        Returns:

            Raw text content

        """
        try:
            with open(file_path, 'r', encoding='utf-8') as f:
                content = f.read()
            logger.info(f"Loaded text file: {file_path} ({len(content)} characters)")
            return content
        except Exception as e:
            logger.error(f"Error loading text file {file_path}: {e}")
            raise

    def clean_text_for_processing(self, text: str) -> str:
        """

        Clean text using Unicode character replacement (same as PDF conversion logic).

        

        Args:

            text: Raw text content

            

        Returns:

            Cleaned text content

        """
        replacements = {
            '\u2019': "'", '\u2018': "'", '\u201c': '"', '\u201d': '"',
            '\u2014': '-', '\u2013': '-', '\u2026': '...',
            '\u200b': '', '\u00a0': ' ', '\u2022': '*',
            '\u2192': '->', '\u2190': '<-',
        }
        
        for old, new in replacements.items():
            text = text.replace(old, new)
        
        return text

    def process_text_file(self, file_path: str, preprocess: bool = True) -> List[Document]:
        """

        Default text file processing method. Can be overridden by specific chunkers.

        

        Args:

            file_path: Path to the text file

            preprocess: Whether to preprocess the text

            

        Returns:

            List of Document objects

        """
        # This is a default implementation that should be overridden
        # by specific chunkers like ParagraphChunker and TokenChunker
        raise NotImplementedError("Subclasses must implement process_text_file method")