File size: 18,550 Bytes
5b89d45
8755993
 
5b89d45
 
 
a3bdcf1
 
 
 
5b89d45
 
 
 
 
a3bdcf1
2156541
 
 
 
 
 
5b89d45
 
 
 
 
 
 
e82a017
 
 
 
 
5b89d45
 
8755993
 
 
5b89d45
8755993
 
 
 
 
 
 
 
 
 
 
 
 
5b89d45
6d5c110
5b89d45
 
 
6d5c110
 
 
 
 
 
 
 
 
 
5b89d45
 
 
 
8755993
5b89d45
 
6d5c110
5b89d45
6d5c110
5b89d45
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
511ccc3
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
5b89d45
511ccc3
 
 
 
 
 
 
 
 
5b89d45
8755993
 
5b89d45
 
 
 
 
 
 
 
511ccc3
5b89d45
 
511ccc3
5b89d45
 
511ccc3
 
5b89d45
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
8755993
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
5b89d45
 
 
 
 
 
 
511ccc3
 
 
 
 
 
 
 
 
 
 
 
 
5b89d45
 
511ccc3
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
5b89d45
 
 
 
 
 
 
 
 
 
 
511ccc3
5b89d45
 
 
 
511ccc3
 
 
 
5b89d45
511ccc3
 
 
 
5b89d45
 
 
511ccc3
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
5b89d45
511ccc3
 
 
 
 
 
 
 
 
 
 
 
 
 
 
5b89d45
511ccc3
 
 
 
 
 
 
 
 
 
 
 
 
 
5b89d45
511ccc3
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
8755993
 
a3bdcf1
8755993
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
import os
from typing import List, Optional
from pathlib import Path
from langchain_core.documents import Document
from langchain_community.vectorstores import Chroma
from langchain_google_genai import GoogleGenerativeAIEmbeddings
from code_chatbot.ingestion.chunker import StructuralChunker
from code_chatbot.ingestion.merkle_tree import MerkleTree, ChangeSet
from code_chatbot.core.path_obfuscator import PathObfuscator
from code_chatbot.core.config import get_config
import shutil
import logging

logger = logging.getLogger(__name__)

from code_chatbot.core.db_connection import (
    get_chroma_client, 
    reset_chroma_clients, 
    set_active_vector_db, 
    get_next_fallback_db,
    VECTOR_DB_FALLBACK_ORDER
)


class Indexer:
    """
    Indexes code files into a Vector Database.
    Now uses StructuralChunker for semantic splitting.
    """
    def __init__(self, persist_directory: str = None, embedding_function=None, provider: str = "gemini", api_key: str = None):
        # Use /tmp for Hugging Face compatibility (they only allow writes to /tmp)
        import tempfile
        self.persist_directory = persist_directory or os.path.join(tempfile.gettempdir(), "vector_db")
        os.makedirs(self.persist_directory, exist_ok=True)
        self.provider = provider
        
        # Load configuration
        self.config = get_config()
        
        # Initialize Structural Chunker
        self.chunker = StructuralChunker(max_tokens=self.config.chunking.max_chunk_tokens)
        
        # Initialize Merkle tree for change detection
        self.merkle_tree = MerkleTree(ignore_patterns=self.config.indexing.ignore_patterns)
        
        # Initialize path obfuscator if enabled
        self.path_obfuscator: Optional[PathObfuscator] = None
        if self.config.privacy.enable_path_obfuscation:
            self.path_obfuscator = PathObfuscator(
                secret_key=self.config.privacy.obfuscation_key,
                mapping_file=self.config.privacy.obfuscation_mapping_file
            )
            logger.info("Path obfuscation enabled")

        # Setup Embeddings - supports Gemini (API) and local HuggingFace
        if embedding_function:
            self.embedding_function = embedding_function
        else:
            if provider == "local" or provider == "huggingface":
                # Use local embeddings - NO RATE LIMITS!
                from langchain_huggingface import HuggingFaceEmbeddings
                self.embedding_function = HuggingFaceEmbeddings(
                    model_name="all-MiniLM-L6-v2",  # Fast & good quality
                    model_kwargs={'device': 'cpu'},
                    encode_kwargs={'normalize_embeddings': True}
                )
                logger.info("Using LOCAL embeddings (no rate limits)")
            elif provider == "gemini":
                api_key = api_key or os.getenv("GOOGLE_API_KEY")
                if not api_key:
                    raise ValueError("Google API Key is required for Gemini Embeddings")
                self.embedding_function = GoogleGenerativeAIEmbeddings(
                    model="models/gemini-embedding-001",
                    google_api_key=api_key
                )
                logger.info("Using Gemini embeddings (API rate limits apply)")
            else:
                raise ValueError(f"Unsupported embedding provider: {provider}. Use 'local', 'huggingface', or 'gemini'.")
                
    def clear_collection(self, collection_name: str = "codebase"):
        """
        Safely clears a collection from the vector database.
        """
        try:
             client = get_chroma_client(self.persist_directory)
             try:
                 client.delete_collection(collection_name)
                 logger.info(f"Deleted collection '{collection_name}'")
             except ValueError:
                 # Collection doesn't exist
                 pass
        except Exception as e:
            logger.warning(f"Failed to clear collection: {e}")


    def index_documents(self, documents: List[Document], collection_name: str = "codebase", vector_db_type: str = "chroma"):
        """
        Splits documents structurally and generates embeddings.
        Supports 'chroma' and 'faiss'.
        """
        if not documents:
            logger.warning("No documents to index.")
            return

        all_chunks = []
        for doc in documents:
            # chunker.chunk returns List[Document]
            file_chunks = self.chunker.chunk(doc.page_content, doc.metadata["file_path"])
            all_chunks.extend(file_chunks)
            
        if not all_chunks:
             pass

        # Create/Update Vector        # Filter out complex metadata and potential None values that slip through
        from langchain_community.vectorstores.utils import filter_complex_metadata
        
        # Ensure metadata is clean
        for doc in all_chunks:
             # Double check for None values in metadata values and remove them
             doc.metadata = {k:v for k,v in doc.metadata.items() if v is not None}
             
        all_chunks = filter_complex_metadata(all_chunks)

        # Attempt indexing with fallback support
        attempted_db = vector_db_type
        fallback_triggered = False
        
        try:
            if vector_db_type == "chroma":
                # Use shared client to avoid "different settings" error
                chroma_client = get_chroma_client(self.persist_directory)
                
                vectordb = Chroma(
                    client=chroma_client,
                    embedding_function=self.embedding_function,
                    collection_name=collection_name
                )
            elif vector_db_type == "faiss":
                from langchain_community.vectorstores import FAISS
                # FAISS is in-memory by default, we'll save it to disk later
                vectordb = None # We build it in the loop
            elif vector_db_type == "qdrant":
                 vectordb = None # Built in bulk later
            else:
                 raise ValueError(f"Unsupported Vector DB: {vector_db_type}")
        except Exception as e:
            error_str = str(e).lower()
            is_chroma_error = any(indicator in error_str for indicator in [
                'tenant', 'default_tenant', 'sqlite', 'corrupt', 
                'no such table', 'locked', 'database'
            ])
            
            if is_chroma_error and vector_db_type == "chroma":
                logger.warning(f"Chroma indexing failed: {e}. Falling back to FAISS...")
                fallback_triggered = True
                attempted_db = "faiss"
                # Clear the corrupted chroma first
                reset_chroma_clients()
                vectordb = None  # Will use FAISS path
            else:
                raise
        
        # Batch processing - smaller batches to avoid rate limits
        batch_size = 20  # Reduced for free tier rate limits
        total_chunks = len(all_chunks)
        
        logger.info(f"Indexing {total_chunks} chunks in batches of {batch_size}...")
        
        from tqdm import tqdm
        import time
        
        # FAISS handles batching poorly if we want to save incrementally, so we build a list first for FAISS or use from_documents
        if vector_db_type == "faiss" or (fallback_triggered and attempted_db == "faiss"):
             from langchain_community.vectorstores import FAISS
             # For FAISS, it's faster to just do it all at once or in big batches
             logger.info(f"Indexing with FAISS (fallback={fallback_triggered})...")
             vectordb = FAISS.from_documents(all_chunks, self.embedding_function)
             vectordb.save_local(folder_path=self.persist_directory, index_name=collection_name)
             set_active_vector_db("faiss")
             logger.info(f"Saved FAISS index to {self.persist_directory}/{collection_name}")
             return vectordb

        elif vector_db_type == "qdrant":
            from langchain_qdrant import QdrantVectorStore
            from qdrant_client import QdrantClient
            
            url = os.getenv("QDRANT_URL")
            api_key = os.getenv("QDRANT_API_KEY")
            
            if not url:
                 # Fallback to local
                 logger.info("No QDRANT_URL found, using local Qdrant memory/disk")
                 location = ":memory:" # or path
            
            vectordb = QdrantVectorStore.from_documents(
                documents=all_chunks,
                embedding=self.embedding_function,
                url=url,
                api_key=api_key,
                collection_name=collection_name,
                prefer_grpc=True
            )
            return vectordb

        # Loop for Chroma (existing logic)
        for i in range(0, total_chunks, batch_size):
            batch = all_chunks[i:i + batch_size]
            # Retry logic for rate limits
            max_retries = 5
            for retry in range(max_retries):
                try:
                    vectordb.add_documents(documents=batch)
                    logger.info(f"Indexed batch {i // batch_size + 1}/{(total_chunks + batch_size - 1) // batch_size}")
                    # Delay to avoid rate limits (free tier is ~15 req/min)
                    time.sleep(4)  # 4 seconds between batches = ~15/min
                    break
                except Exception as e:
                    error_str = str(e).lower()
                    if 'rate' in error_str or '429' in error_str or 'quota' in error_str or 'resource_exhausted' in error_str:
                        wait_time = 30 * (retry + 1)  # 30s, 60s, 90s, 120s, 150s
                        logger.warning(f"Rate limit hit, waiting {wait_time}s... (retry {retry+1}/{max_retries})")
                        time.sleep(wait_time)
                    else:
                        logger.error(f"Error indexing batch {i}: {e}")
                        break
                
        
        # PersistentClient auto-persists
        logger.info(f"Indexed {len(all_chunks)} chunks into collection '{collection_name}' at {self.persist_directory}")
        return vectordb

    def get_retriever(self, collection_name: str = "codebase", k: int = 10, vector_db_type: str = "chroma"):
        """Get a retriever for the specified collection with automatic fallback.
        
        When the primary vector database fails, automatically attempts the next
        database in the fallback order (chroma -> faiss).
        
        Args:
            collection_name: Name of the collection to retrieve from
            k: Number of results to return (default 10)
            vector_db_type: Primary vector database type to try
            
        Returns:
            Configured retriever with fallback protection
        """
        logger.info(f"Creating retriever for collection '{collection_name}' from {self.persist_directory}")
        
        # Track attempts for fallback
        attempted_dbs = []
        last_error = None
        current_db = vector_db_type
        
        while current_db and current_db not in attempted_dbs:
            attempted_dbs.append(current_db)
            
            try:
                vector_store = self._create_vector_store(current_db, collection_name)
                
                if vector_store:
                    # Success! Update active DB and return retriever
                    set_active_vector_db(current_db)
                    retriever = vector_store.as_retriever(search_kwargs={"k": k})
                    logger.info(f"Retriever created with k={k} using {current_db}")
                    return retriever
                    
            except Exception as e:
                last_error = e
                error_str = str(e).lower()
                
                # Check if this is a recoverable error that warrants fallback
                is_chroma_error = any(indicator in error_str for indicator in [
                    'tenant', 'default_tenant', 'sqlite', 'corrupt', 
                    'no such table', 'locked', 'database'
                ])
                
                if is_chroma_error or 'chroma' in error_str:
                    logger.warning(f"Vector DB '{current_db}' failed: {e}")
                    
                    # Try next fallback
                    next_db = get_next_fallback_db(current_db)
                    if next_db:
                        logger.info(f"Attempting fallback to '{next_db}'...")
                        current_db = next_db
                        continue
                
                # Non-recoverable error
                logger.error(f"Vector DB '{current_db}' failed with non-recoverable error: {e}")
                break
        
        # All fallbacks exhausted
        if last_error:
            raise RuntimeError(
                f"All vector database options failed. Attempted: {attempted_dbs}. "
                f"Last error: {last_error}"
            )
        else:
            raise ValueError(f"No valid vector database available. Attempted: {attempted_dbs}")
    
    def _create_vector_store(self, vector_db_type: str, collection_name: str):
        """Create a vector store instance for the given database type.
        
        Args:
            vector_db_type: Type of vector database (chroma, faiss, qdrant)
            collection_name: Name of the collection
            
        Returns:
            Vector store instance
            
        Raises:
            Exception: If vector store creation fails
        """
        if vector_db_type == "chroma":
            # Use shared client to avoid "different settings" error
            chroma_client = get_chroma_client(self.persist_directory)
            
            # Load existing vector store
            vector_store = Chroma(
                client=chroma_client,
                collection_name=collection_name,
                embedding_function=self.embedding_function,
            )
            
            # Verify the store works by getting count
            try:
                collection = vector_store._collection
                count = collection.count()
                logger.info(f"Collection '{collection_name}' has {count} documents")
                
                if count == 0:
                    logger.warning(f"Chroma collection '{collection_name}' is empty!")
                    
            except Exception as e:
                # Re-raise to trigger fallback
                raise RuntimeError(f"Chroma verification failed: {e}")
                
            return vector_store
                
        elif vector_db_type == "faiss":
            from langchain_community.vectorstores import FAISS
            
            faiss_index_path = os.path.join(self.persist_directory, f"{collection_name}.faiss")
            faiss_pkl_path = os.path.join(self.persist_directory, f"{collection_name}.pkl")
            
            # Check if FAISS index exists
            if not os.path.exists(faiss_index_path) and not os.path.exists(faiss_pkl_path):
                # Try default naming convention
                faiss_index_path = os.path.join(self.persist_directory, "index.faiss")
                faiss_pkl_path = os.path.join(self.persist_directory, "index.pkl")
            
            if not os.path.exists(faiss_index_path):
                logger.warning(f"No FAISS index found at {self.persist_directory}, will need to re-index")
                # We could trigger re-indexing here or raise to try next fallback
                raise FileNotFoundError(f"FAISS index not found at {self.persist_directory}")
            
            vector_store = FAISS.load_local(
                folder_path=self.persist_directory, 
                embeddings=self.embedding_function,
                index_name=collection_name,
                allow_dangerous_deserialization=True
            )
            logger.info(f"Loaded FAISS index from {self.persist_directory}")
            return vector_store
            
        elif vector_db_type == "qdrant":
            from langchain_qdrant import QdrantVectorStore
            
            url = os.getenv("QDRANT_URL")
            api_key = os.getenv("QDRANT_API_KEY")
            
            vector_store = QdrantVectorStore(
                client=None,
                collection_name=collection_name,
                embedding=self.embedding_function,
                url=url,
                api_key=api_key,
            )
            logger.info(f"Connected to Qdrant at {url}")
            return vector_store
        
        else:
            raise ValueError(f"Unsupported Vector DB: {vector_db_type}")
    
    def get_retriever_with_reindex_fallback(
        self, 
        documents: List[Document] = None,
        collection_name: str = "codebase", 
        k: int = 10, 
        vector_db_type: str = "chroma"
    ):
        """Get retriever with automatic re-indexing fallback.
        
        If the primary vector DB fails and fallback also fails to load,
        this method will automatically re-index the documents using
        the fallback database.
        
        Args:
            documents: Documents to re-index if needed (optional)
            collection_name: Collection name
            k: Number of results
            vector_db_type: Primary DB type
            
        Returns:
            Configured retriever
        """
        try:
            return self.get_retriever(collection_name, k, vector_db_type)
        except (RuntimeError, FileNotFoundError) as e:
            if documents:
                logger.warning(f"Retriever creation failed, attempting re-index with fallback DB: {e}")
                
                # Get fallback DB
                fallback_db = get_next_fallback_db(vector_db_type) or "faiss"
                
                # Re-index with fallback
                logger.info(f"Re-indexing {len(documents)} documents with {fallback_db}...")
                self.index_documents(documents, collection_name, fallback_db)
                
                # Try getting retriever again
                return self.get_retriever(collection_name, k, fallback_db)
            else:
                raise

# Add incremental indexing methods to the Indexer class
from code_chatbot.ingestion.incremental_indexing import add_incremental_indexing_methods
Indexer = add_incremental_indexing_methods(Indexer)