Spaces:
Running
Running
Asish Karthikeya Gogineni
feat: Add Groq-optimized prompts, vector DB fallback, improved ignore patterns
511ccc3 | import os | |
| from typing import List, Optional | |
| from pathlib import Path | |
| from langchain_core.documents import Document | |
| from langchain_community.vectorstores import Chroma | |
| from langchain_google_genai import GoogleGenerativeAIEmbeddings | |
| from code_chatbot.chunker import StructuralChunker | |
| from code_chatbot.merkle_tree import MerkleTree, ChangeSet | |
| from code_chatbot.path_obfuscator import PathObfuscator | |
| from code_chatbot.config import get_config | |
| import shutil | |
| import logging | |
| logger = logging.getLogger(__name__) | |
| # Vector database fallback priority order | |
| # When primary DB fails, automatically try the next in list | |
| VECTOR_DB_FALLBACK_ORDER = ["chroma", "faiss"] | |
| # Track which vector DB is currently active (for automatic fallback) | |
| _active_vector_db = {"type": "chroma", "fallback_count": 0} | |
| def get_active_vector_db() -> str: | |
| """Get the currently active vector database type.""" | |
| return _active_vector_db["type"] | |
| def set_active_vector_db(db_type: str): | |
| """Set the active vector database type.""" | |
| _active_vector_db["type"] = db_type | |
| logger.info(f"Active vector database set to: {db_type}") | |
| def get_next_fallback_db(current_db: str) -> Optional[str]: | |
| """Get the next fallback vector database in the priority order. | |
| Args: | |
| current_db: Current vector database type that failed | |
| Returns: | |
| Next vector database type to try, or None if no more fallbacks | |
| """ | |
| try: | |
| current_idx = VECTOR_DB_FALLBACK_ORDER.index(current_db) | |
| if current_idx + 1 < len(VECTOR_DB_FALLBACK_ORDER): | |
| return VECTOR_DB_FALLBACK_ORDER[current_idx + 1] | |
| except ValueError: | |
| pass | |
| return None | |
| # Global ChromaDB client cache to avoid "different settings" error | |
| _chroma_clients = {} | |
| def reset_chroma_clients(): | |
| """Reset all cached ChromaDB clients. Call when database corruption is detected.""" | |
| global _chroma_clients | |
| _chroma_clients = {} | |
| logger.info("Reset ChromaDB client cache") | |
| def get_chroma_client(persist_directory: str): | |
| """Get or create a shared ChromaDB client for a given path. | |
| Includes automatic recovery for common ChromaDB errors: | |
| - tenant default_tenant connection errors | |
| - Database corruption | |
| - Version mismatch issues | |
| """ | |
| global _chroma_clients | |
| # Ensure directory exists | |
| os.makedirs(persist_directory, exist_ok=True) | |
| if persist_directory not in _chroma_clients: | |
| import chromadb | |
| from chromadb.config import Settings | |
| def create_client(): | |
| """Helper to create a new ChromaDB client.""" | |
| return chromadb.PersistentClient( | |
| path=persist_directory, | |
| settings=Settings( | |
| anonymized_telemetry=False, | |
| allow_reset=True | |
| ) | |
| ) | |
| def clear_and_recreate(): | |
| """Clear corrupted database and create fresh client.""" | |
| logger.warning(f"Clearing corrupted ChromaDB at {persist_directory} and recreating...") | |
| import shutil | |
| if os.path.exists(persist_directory): | |
| shutil.rmtree(persist_directory) | |
| os.makedirs(persist_directory, exist_ok=True) | |
| return create_client() | |
| def is_corruption_error(error: Exception) -> bool: | |
| """Check if error indicates database corruption.""" | |
| error_str = str(error).lower() | |
| corruption_indicators = [ | |
| 'tenant', # "Could not connect to tenant default_tenant" | |
| 'default_tenant', | |
| 'sqlite', # SQLite database issues | |
| 'database', | |
| 'corrupt', | |
| 'no such table', | |
| 'disk i/o error', | |
| 'malformed', | |
| 'locked', | |
| ] | |
| return any(indicator in error_str for indicator in corruption_indicators) | |
| try: | |
| _chroma_clients[persist_directory] = create_client() | |
| # Verify the client works by attempting a simple operation | |
| try: | |
| _chroma_clients[persist_directory].heartbeat() | |
| except Exception as verify_error: | |
| if is_corruption_error(verify_error): | |
| logger.error(f"ChromaDB verification failed: {verify_error}") | |
| del _chroma_clients[persist_directory] | |
| _chroma_clients[persist_directory] = clear_and_recreate() | |
| else: | |
| raise | |
| except Exception as e: | |
| logger.error(f"Failed to create ChromaDB client: {e}") | |
| if is_corruption_error(e): | |
| _chroma_clients[persist_directory] = clear_and_recreate() | |
| else: | |
| # For non-corruption errors, still try to recover | |
| try: | |
| _chroma_clients[persist_directory] = clear_and_recreate() | |
| except Exception as recovery_error: | |
| logger.error(f"Recovery also failed: {recovery_error}") | |
| raise recovery_error | |
| return _chroma_clients[persist_directory] | |
| class Indexer: | |
| """ | |
| Indexes code files into a Vector Database. | |
| Now uses StructuralChunker for semantic splitting. | |
| """ | |
| def __init__(self, persist_directory: str = "chroma_db", embedding_function=None, provider: str = "gemini", api_key: str = None): | |
| self.persist_directory = persist_directory | |
| self.provider = provider | |
| # Load configuration | |
| self.config = get_config() | |
| # Initialize Structural Chunker | |
| self.chunker = StructuralChunker(max_tokens=self.config.chunking.max_chunk_tokens) | |
| # Initialize Merkle tree for change detection | |
| self.merkle_tree = MerkleTree(ignore_patterns=self.config.indexing.ignore_patterns) | |
| # Initialize path obfuscator if enabled | |
| self.path_obfuscator: Optional[PathObfuscator] = None | |
| if self.config.privacy.enable_path_obfuscation: | |
| self.path_obfuscator = PathObfuscator( | |
| secret_key=self.config.privacy.obfuscation_key, | |
| mapping_file=self.config.privacy.obfuscation_mapping_file | |
| ) | |
| logger.info("Path obfuscation enabled") | |
| # Setup Embeddings - supports Gemini (API) and local HuggingFace | |
| if embedding_function: | |
| self.embedding_function = embedding_function | |
| else: | |
| if provider == "local" or provider == "huggingface": | |
| # Use local embeddings - NO RATE LIMITS! | |
| from langchain_huggingface import HuggingFaceEmbeddings | |
| self.embedding_function = HuggingFaceEmbeddings( | |
| model_name="all-MiniLM-L6-v2", # Fast & good quality | |
| model_kwargs={'device': 'cpu'}, | |
| encode_kwargs={'normalize_embeddings': True} | |
| ) | |
| logger.info("Using LOCAL embeddings (no rate limits)") | |
| elif provider == "gemini": | |
| api_key = api_key or os.getenv("GOOGLE_API_KEY") | |
| if not api_key: | |
| raise ValueError("Google API Key is required for Gemini Embeddings") | |
| self.embedding_function = GoogleGenerativeAIEmbeddings( | |
| model="models/gemini-embedding-001", | |
| google_api_key=api_key | |
| ) | |
| logger.info("Using Gemini embeddings (API rate limits apply)") | |
| else: | |
| raise ValueError(f"Unsupported embedding provider: {provider}. Use 'local', 'huggingface', or 'gemini'.") | |
| def clear_collection(self, collection_name: str = "codebase"): | |
| """ | |
| Safely clears a collection from the vector database. | |
| """ | |
| try: | |
| client = get_chroma_client(self.persist_directory) | |
| try: | |
| client.delete_collection(collection_name) | |
| logger.info(f"Deleted collection '{collection_name}'") | |
| except ValueError: | |
| # Collection doesn't exist | |
| pass | |
| except Exception as e: | |
| logger.warning(f"Failed to clear collection: {e}") | |
| def index_documents(self, documents: List[Document], collection_name: str = "codebase", vector_db_type: str = "chroma"): | |
| """ | |
| Splits documents structurally and generates embeddings. | |
| Supports 'chroma' and 'faiss'. | |
| """ | |
| if not documents: | |
| logger.warning("No documents to index.") | |
| return | |
| all_chunks = [] | |
| for doc in documents: | |
| # chunker.chunk returns List[Document] | |
| file_chunks = self.chunker.chunk(doc.page_content, doc.metadata["file_path"]) | |
| all_chunks.extend(file_chunks) | |
| if not all_chunks: | |
| pass | |
| # Create/Update Vector # Filter out complex metadata and potential None values that slip through | |
| from langchain_community.vectorstores.utils import filter_complex_metadata | |
| # Ensure metadata is clean | |
| for doc in all_chunks: | |
| # Double check for None values in metadata values and remove them | |
| doc.metadata = {k:v for k,v in doc.metadata.items() if v is not None} | |
| all_chunks = filter_complex_metadata(all_chunks) | |
| # Attempt indexing with fallback support | |
| attempted_db = vector_db_type | |
| fallback_triggered = False | |
| try: | |
| if vector_db_type == "chroma": | |
| # Use shared client to avoid "different settings" error | |
| chroma_client = get_chroma_client(self.persist_directory) | |
| vectordb = Chroma( | |
| client=chroma_client, | |
| embedding_function=self.embedding_function, | |
| collection_name=collection_name | |
| ) | |
| elif vector_db_type == "faiss": | |
| from langchain_community.vectorstores import FAISS | |
| # FAISS is in-memory by default, we'll save it to disk later | |
| vectordb = None # We build it in the loop | |
| elif vector_db_type == "qdrant": | |
| vectordb = None # Built in bulk later | |
| else: | |
| raise ValueError(f"Unsupported Vector DB: {vector_db_type}") | |
| except Exception as e: | |
| error_str = str(e).lower() | |
| is_chroma_error = any(indicator in error_str for indicator in [ | |
| 'tenant', 'default_tenant', 'sqlite', 'corrupt', | |
| 'no such table', 'locked', 'database' | |
| ]) | |
| if is_chroma_error and vector_db_type == "chroma": | |
| logger.warning(f"Chroma indexing failed: {e}. Falling back to FAISS...") | |
| fallback_triggered = True | |
| attempted_db = "faiss" | |
| # Clear the corrupted chroma first | |
| reset_chroma_clients() | |
| vectordb = None # Will use FAISS path | |
| else: | |
| raise | |
| # Batch processing - smaller batches to avoid rate limits | |
| batch_size = 20 # Reduced for free tier rate limits | |
| total_chunks = len(all_chunks) | |
| logger.info(f"Indexing {total_chunks} chunks in batches of {batch_size}...") | |
| from tqdm import tqdm | |
| import time | |
| # FAISS handles batching poorly if we want to save incrementally, so we build a list first for FAISS or use from_documents | |
| if vector_db_type == "faiss" or (fallback_triggered and attempted_db == "faiss"): | |
| from langchain_community.vectorstores import FAISS | |
| # For FAISS, it's faster to just do it all at once or in big batches | |
| logger.info(f"Indexing with FAISS (fallback={fallback_triggered})...") | |
| vectordb = FAISS.from_documents(all_chunks, self.embedding_function) | |
| vectordb.save_local(folder_path=self.persist_directory, index_name=collection_name) | |
| set_active_vector_db("faiss") | |
| logger.info(f"Saved FAISS index to {self.persist_directory}/{collection_name}") | |
| return vectordb | |
| elif vector_db_type == "qdrant": | |
| from langchain_qdrant import QdrantVectorStore | |
| from qdrant_client import QdrantClient | |
| url = os.getenv("QDRANT_URL") | |
| api_key = os.getenv("QDRANT_API_KEY") | |
| if not url: | |
| # Fallback to local | |
| logger.info("No QDRANT_URL found, using local Qdrant memory/disk") | |
| location = ":memory:" # or path | |
| vectordb = QdrantVectorStore.from_documents( | |
| documents=all_chunks, | |
| embedding=self.embedding_function, | |
| url=url, | |
| api_key=api_key, | |
| collection_name=collection_name, | |
| prefer_grpc=True | |
| ) | |
| return vectordb | |
| # Loop for Chroma (existing logic) | |
| for i in range(0, total_chunks, batch_size): | |
| batch = all_chunks[i:i + batch_size] | |
| # Retry logic for rate limits | |
| max_retries = 5 | |
| for retry in range(max_retries): | |
| try: | |
| vectordb.add_documents(documents=batch) | |
| logger.info(f"Indexed batch {i // batch_size + 1}/{(total_chunks + batch_size - 1) // batch_size}") | |
| # Delay to avoid rate limits (free tier is ~15 req/min) | |
| time.sleep(4) # 4 seconds between batches = ~15/min | |
| break | |
| except Exception as e: | |
| error_str = str(e).lower() | |
| if 'rate' in error_str or '429' in error_str or 'quota' in error_str or 'resource_exhausted' in error_str: | |
| wait_time = 30 * (retry + 1) # 30s, 60s, 90s, 120s, 150s | |
| logger.warning(f"Rate limit hit, waiting {wait_time}s... (retry {retry+1}/{max_retries})") | |
| time.sleep(wait_time) | |
| else: | |
| logger.error(f"Error indexing batch {i}: {e}") | |
| break | |
| # PersistentClient auto-persists | |
| logger.info(f"Indexed {len(all_chunks)} chunks into collection '{collection_name}' at {self.persist_directory}") | |
| return vectordb | |
| def get_retriever(self, collection_name: str = "codebase", k: int = 10, vector_db_type: str = "chroma"): | |
| """Get a retriever for the specified collection with automatic fallback. | |
| When the primary vector database fails, automatically attempts the next | |
| database in the fallback order (chroma -> faiss). | |
| Args: | |
| collection_name: Name of the collection to retrieve from | |
| k: Number of results to return (default 10) | |
| vector_db_type: Primary vector database type to try | |
| Returns: | |
| Configured retriever with fallback protection | |
| """ | |
| logger.info(f"Creating retriever for collection '{collection_name}' from {self.persist_directory}") | |
| # Track attempts for fallback | |
| attempted_dbs = [] | |
| last_error = None | |
| current_db = vector_db_type | |
| while current_db and current_db not in attempted_dbs: | |
| attempted_dbs.append(current_db) | |
| try: | |
| vector_store = self._create_vector_store(current_db, collection_name) | |
| if vector_store: | |
| # Success! Update active DB and return retriever | |
| set_active_vector_db(current_db) | |
| retriever = vector_store.as_retriever(search_kwargs={"k": k}) | |
| logger.info(f"Retriever created with k={k} using {current_db}") | |
| return retriever | |
| except Exception as e: | |
| last_error = e | |
| error_str = str(e).lower() | |
| # Check if this is a recoverable error that warrants fallback | |
| is_chroma_error = any(indicator in error_str for indicator in [ | |
| 'tenant', 'default_tenant', 'sqlite', 'corrupt', | |
| 'no such table', 'locked', 'database' | |
| ]) | |
| if is_chroma_error or 'chroma' in error_str: | |
| logger.warning(f"Vector DB '{current_db}' failed: {e}") | |
| # Try next fallback | |
| next_db = get_next_fallback_db(current_db) | |
| if next_db: | |
| logger.info(f"Attempting fallback to '{next_db}'...") | |
| current_db = next_db | |
| continue | |
| # Non-recoverable error | |
| logger.error(f"Vector DB '{current_db}' failed with non-recoverable error: {e}") | |
| break | |
| # All fallbacks exhausted | |
| if last_error: | |
| raise RuntimeError( | |
| f"All vector database options failed. Attempted: {attempted_dbs}. " | |
| f"Last error: {last_error}" | |
| ) | |
| else: | |
| raise ValueError(f"No valid vector database available. Attempted: {attempted_dbs}") | |
| def _create_vector_store(self, vector_db_type: str, collection_name: str): | |
| """Create a vector store instance for the given database type. | |
| Args: | |
| vector_db_type: Type of vector database (chroma, faiss, qdrant) | |
| collection_name: Name of the collection | |
| Returns: | |
| Vector store instance | |
| Raises: | |
| Exception: If vector store creation fails | |
| """ | |
| if vector_db_type == "chroma": | |
| # Use shared client to avoid "different settings" error | |
| chroma_client = get_chroma_client(self.persist_directory) | |
| # Load existing vector store | |
| vector_store = Chroma( | |
| client=chroma_client, | |
| collection_name=collection_name, | |
| embedding_function=self.embedding_function, | |
| ) | |
| # Verify the store works by getting count | |
| try: | |
| collection = vector_store._collection | |
| count = collection.count() | |
| logger.info(f"Collection '{collection_name}' has {count} documents") | |
| if count == 0: | |
| logger.warning(f"Chroma collection '{collection_name}' is empty!") | |
| except Exception as e: | |
| # Re-raise to trigger fallback | |
| raise RuntimeError(f"Chroma verification failed: {e}") | |
| return vector_store | |
| elif vector_db_type == "faiss": | |
| from langchain_community.vectorstores import FAISS | |
| faiss_index_path = os.path.join(self.persist_directory, f"{collection_name}.faiss") | |
| faiss_pkl_path = os.path.join(self.persist_directory, f"{collection_name}.pkl") | |
| # Check if FAISS index exists | |
| if not os.path.exists(faiss_index_path) and not os.path.exists(faiss_pkl_path): | |
| # Try default naming convention | |
| faiss_index_path = os.path.join(self.persist_directory, "index.faiss") | |
| faiss_pkl_path = os.path.join(self.persist_directory, "index.pkl") | |
| if not os.path.exists(faiss_index_path): | |
| logger.warning(f"No FAISS index found at {self.persist_directory}, will need to re-index") | |
| # We could trigger re-indexing here or raise to try next fallback | |
| raise FileNotFoundError(f"FAISS index not found at {self.persist_directory}") | |
| vector_store = FAISS.load_local( | |
| folder_path=self.persist_directory, | |
| embeddings=self.embedding_function, | |
| index_name=collection_name, | |
| allow_dangerous_deserialization=True | |
| ) | |
| logger.info(f"Loaded FAISS index from {self.persist_directory}") | |
| return vector_store | |
| elif vector_db_type == "qdrant": | |
| from langchain_qdrant import QdrantVectorStore | |
| url = os.getenv("QDRANT_URL") | |
| api_key = os.getenv("QDRANT_API_KEY") | |
| vector_store = QdrantVectorStore( | |
| client=None, | |
| collection_name=collection_name, | |
| embedding=self.embedding_function, | |
| url=url, | |
| api_key=api_key, | |
| ) | |
| logger.info(f"Connected to Qdrant at {url}") | |
| return vector_store | |
| else: | |
| raise ValueError(f"Unsupported Vector DB: {vector_db_type}") | |
| def get_retriever_with_reindex_fallback( | |
| self, | |
| documents: List[Document] = None, | |
| collection_name: str = "codebase", | |
| k: int = 10, | |
| vector_db_type: str = "chroma" | |
| ): | |
| """Get retriever with automatic re-indexing fallback. | |
| If the primary vector DB fails and fallback also fails to load, | |
| this method will automatically re-index the documents using | |
| the fallback database. | |
| Args: | |
| documents: Documents to re-index if needed (optional) | |
| collection_name: Collection name | |
| k: Number of results | |
| vector_db_type: Primary DB type | |
| Returns: | |
| Configured retriever | |
| """ | |
| try: | |
| return self.get_retriever(collection_name, k, vector_db_type) | |
| except (RuntimeError, FileNotFoundError) as e: | |
| if documents: | |
| logger.warning(f"Retriever creation failed, attempting re-index with fallback DB: {e}") | |
| # Get fallback DB | |
| fallback_db = get_next_fallback_db(vector_db_type) or "faiss" | |
| # Re-index with fallback | |
| logger.info(f"Re-indexing {len(documents)} documents with {fallback_db}...") | |
| self.index_documents(documents, collection_name, fallback_db) | |
| # Try getting retriever again | |
| return self.get_retriever(collection_name, k, fallback_db) | |
| else: | |
| raise | |
| # Add incremental indexing methods to the Indexer class | |
| from code_chatbot.incremental_indexing import add_incremental_indexing_methods | |
| Indexer = add_incremental_indexing_methods(Indexer) | |