Asish Karthikeya Gogineni
Refactor: Code Structure Update & UI Redesign
a3bdcf1
import os
from typing import List, Optional
from pathlib import Path
from langchain_core.documents import Document
from langchain_community.vectorstores import Chroma
from langchain_google_genai import GoogleGenerativeAIEmbeddings
from code_chatbot.ingestion.chunker import StructuralChunker
from code_chatbot.ingestion.merkle_tree import MerkleTree, ChangeSet
from code_chatbot.core.path_obfuscator import PathObfuscator
from code_chatbot.core.config import get_config
import shutil
import logging
logger = logging.getLogger(__name__)
from code_chatbot.core.db_connection import (
get_chroma_client,
reset_chroma_clients,
set_active_vector_db,
get_next_fallback_db,
VECTOR_DB_FALLBACK_ORDER
)
class Indexer:
"""
Indexes code files into a Vector Database.
Now uses StructuralChunker for semantic splitting.
"""
def __init__(self, persist_directory: str = None, embedding_function=None, provider: str = "gemini", api_key: str = None):
# Use /tmp for Hugging Face compatibility (they only allow writes to /tmp)
import tempfile
self.persist_directory = persist_directory or os.path.join(tempfile.gettempdir(), "vector_db")
os.makedirs(self.persist_directory, exist_ok=True)
self.provider = provider
# Load configuration
self.config = get_config()
# Initialize Structural Chunker
self.chunker = StructuralChunker(max_tokens=self.config.chunking.max_chunk_tokens)
# Initialize Merkle tree for change detection
self.merkle_tree = MerkleTree(ignore_patterns=self.config.indexing.ignore_patterns)
# Initialize path obfuscator if enabled
self.path_obfuscator: Optional[PathObfuscator] = None
if self.config.privacy.enable_path_obfuscation:
self.path_obfuscator = PathObfuscator(
secret_key=self.config.privacy.obfuscation_key,
mapping_file=self.config.privacy.obfuscation_mapping_file
)
logger.info("Path obfuscation enabled")
# Setup Embeddings - supports Gemini (API) and local HuggingFace
if embedding_function:
self.embedding_function = embedding_function
else:
if provider == "local" or provider == "huggingface":
# Use local embeddings - NO RATE LIMITS!
from langchain_huggingface import HuggingFaceEmbeddings
self.embedding_function = HuggingFaceEmbeddings(
model_name="all-MiniLM-L6-v2", # Fast & good quality
model_kwargs={'device': 'cpu'},
encode_kwargs={'normalize_embeddings': True}
)
logger.info("Using LOCAL embeddings (no rate limits)")
elif provider == "gemini":
api_key = api_key or os.getenv("GOOGLE_API_KEY")
if not api_key:
raise ValueError("Google API Key is required for Gemini Embeddings")
self.embedding_function = GoogleGenerativeAIEmbeddings(
model="models/gemini-embedding-001",
google_api_key=api_key
)
logger.info("Using Gemini embeddings (API rate limits apply)")
else:
raise ValueError(f"Unsupported embedding provider: {provider}. Use 'local', 'huggingface', or 'gemini'.")
def clear_collection(self, collection_name: str = "codebase"):
"""
Safely clears a collection from the vector database.
"""
try:
client = get_chroma_client(self.persist_directory)
try:
client.delete_collection(collection_name)
logger.info(f"Deleted collection '{collection_name}'")
except ValueError:
# Collection doesn't exist
pass
except Exception as e:
logger.warning(f"Failed to clear collection: {e}")
def index_documents(self, documents: List[Document], collection_name: str = "codebase", vector_db_type: str = "chroma"):
"""
Splits documents structurally and generates embeddings.
Supports 'chroma' and 'faiss'.
"""
if not documents:
logger.warning("No documents to index.")
return
all_chunks = []
for doc in documents:
# chunker.chunk returns List[Document]
file_chunks = self.chunker.chunk(doc.page_content, doc.metadata["file_path"])
all_chunks.extend(file_chunks)
if not all_chunks:
pass
# Create/Update Vector # Filter out complex metadata and potential None values that slip through
from langchain_community.vectorstores.utils import filter_complex_metadata
# Ensure metadata is clean
for doc in all_chunks:
# Double check for None values in metadata values and remove them
doc.metadata = {k:v for k,v in doc.metadata.items() if v is not None}
all_chunks = filter_complex_metadata(all_chunks)
# Attempt indexing with fallback support
attempted_db = vector_db_type
fallback_triggered = False
try:
if vector_db_type == "chroma":
# Use shared client to avoid "different settings" error
chroma_client = get_chroma_client(self.persist_directory)
vectordb = Chroma(
client=chroma_client,
embedding_function=self.embedding_function,
collection_name=collection_name
)
elif vector_db_type == "faiss":
from langchain_community.vectorstores import FAISS
# FAISS is in-memory by default, we'll save it to disk later
vectordb = None # We build it in the loop
elif vector_db_type == "qdrant":
vectordb = None # Built in bulk later
else:
raise ValueError(f"Unsupported Vector DB: {vector_db_type}")
except Exception as e:
error_str = str(e).lower()
is_chroma_error = any(indicator in error_str for indicator in [
'tenant', 'default_tenant', 'sqlite', 'corrupt',
'no such table', 'locked', 'database'
])
if is_chroma_error and vector_db_type == "chroma":
logger.warning(f"Chroma indexing failed: {e}. Falling back to FAISS...")
fallback_triggered = True
attempted_db = "faiss"
# Clear the corrupted chroma first
reset_chroma_clients()
vectordb = None # Will use FAISS path
else:
raise
# Batch processing - smaller batches to avoid rate limits
batch_size = 20 # Reduced for free tier rate limits
total_chunks = len(all_chunks)
logger.info(f"Indexing {total_chunks} chunks in batches of {batch_size}...")
from tqdm import tqdm
import time
# FAISS handles batching poorly if we want to save incrementally, so we build a list first for FAISS or use from_documents
if vector_db_type == "faiss" or (fallback_triggered and attempted_db == "faiss"):
from langchain_community.vectorstores import FAISS
# For FAISS, it's faster to just do it all at once or in big batches
logger.info(f"Indexing with FAISS (fallback={fallback_triggered})...")
vectordb = FAISS.from_documents(all_chunks, self.embedding_function)
vectordb.save_local(folder_path=self.persist_directory, index_name=collection_name)
set_active_vector_db("faiss")
logger.info(f"Saved FAISS index to {self.persist_directory}/{collection_name}")
return vectordb
elif vector_db_type == "qdrant":
from langchain_qdrant import QdrantVectorStore
from qdrant_client import QdrantClient
url = os.getenv("QDRANT_URL")
api_key = os.getenv("QDRANT_API_KEY")
if not url:
# Fallback to local
logger.info("No QDRANT_URL found, using local Qdrant memory/disk")
location = ":memory:" # or path
vectordb = QdrantVectorStore.from_documents(
documents=all_chunks,
embedding=self.embedding_function,
url=url,
api_key=api_key,
collection_name=collection_name,
prefer_grpc=True
)
return vectordb
# Loop for Chroma (existing logic)
for i in range(0, total_chunks, batch_size):
batch = all_chunks[i:i + batch_size]
# Retry logic for rate limits
max_retries = 5
for retry in range(max_retries):
try:
vectordb.add_documents(documents=batch)
logger.info(f"Indexed batch {i // batch_size + 1}/{(total_chunks + batch_size - 1) // batch_size}")
# Delay to avoid rate limits (free tier is ~15 req/min)
time.sleep(4) # 4 seconds between batches = ~15/min
break
except Exception as e:
error_str = str(e).lower()
if 'rate' in error_str or '429' in error_str or 'quota' in error_str or 'resource_exhausted' in error_str:
wait_time = 30 * (retry + 1) # 30s, 60s, 90s, 120s, 150s
logger.warning(f"Rate limit hit, waiting {wait_time}s... (retry {retry+1}/{max_retries})")
time.sleep(wait_time)
else:
logger.error(f"Error indexing batch {i}: {e}")
break
# PersistentClient auto-persists
logger.info(f"Indexed {len(all_chunks)} chunks into collection '{collection_name}' at {self.persist_directory}")
return vectordb
def get_retriever(self, collection_name: str = "codebase", k: int = 10, vector_db_type: str = "chroma"):
"""Get a retriever for the specified collection with automatic fallback.
When the primary vector database fails, automatically attempts the next
database in the fallback order (chroma -> faiss).
Args:
collection_name: Name of the collection to retrieve from
k: Number of results to return (default 10)
vector_db_type: Primary vector database type to try
Returns:
Configured retriever with fallback protection
"""
logger.info(f"Creating retriever for collection '{collection_name}' from {self.persist_directory}")
# Track attempts for fallback
attempted_dbs = []
last_error = None
current_db = vector_db_type
while current_db and current_db not in attempted_dbs:
attempted_dbs.append(current_db)
try:
vector_store = self._create_vector_store(current_db, collection_name)
if vector_store:
# Success! Update active DB and return retriever
set_active_vector_db(current_db)
retriever = vector_store.as_retriever(search_kwargs={"k": k})
logger.info(f"Retriever created with k={k} using {current_db}")
return retriever
except Exception as e:
last_error = e
error_str = str(e).lower()
# Check if this is a recoverable error that warrants fallback
is_chroma_error = any(indicator in error_str for indicator in [
'tenant', 'default_tenant', 'sqlite', 'corrupt',
'no such table', 'locked', 'database'
])
if is_chroma_error or 'chroma' in error_str:
logger.warning(f"Vector DB '{current_db}' failed: {e}")
# Try next fallback
next_db = get_next_fallback_db(current_db)
if next_db:
logger.info(f"Attempting fallback to '{next_db}'...")
current_db = next_db
continue
# Non-recoverable error
logger.error(f"Vector DB '{current_db}' failed with non-recoverable error: {e}")
break
# All fallbacks exhausted
if last_error:
raise RuntimeError(
f"All vector database options failed. Attempted: {attempted_dbs}. "
f"Last error: {last_error}"
)
else:
raise ValueError(f"No valid vector database available. Attempted: {attempted_dbs}")
def _create_vector_store(self, vector_db_type: str, collection_name: str):
"""Create a vector store instance for the given database type.
Args:
vector_db_type: Type of vector database (chroma, faiss, qdrant)
collection_name: Name of the collection
Returns:
Vector store instance
Raises:
Exception: If vector store creation fails
"""
if vector_db_type == "chroma":
# Use shared client to avoid "different settings" error
chroma_client = get_chroma_client(self.persist_directory)
# Load existing vector store
vector_store = Chroma(
client=chroma_client,
collection_name=collection_name,
embedding_function=self.embedding_function,
)
# Verify the store works by getting count
try:
collection = vector_store._collection
count = collection.count()
logger.info(f"Collection '{collection_name}' has {count} documents")
if count == 0:
logger.warning(f"Chroma collection '{collection_name}' is empty!")
except Exception as e:
# Re-raise to trigger fallback
raise RuntimeError(f"Chroma verification failed: {e}")
return vector_store
elif vector_db_type == "faiss":
from langchain_community.vectorstores import FAISS
faiss_index_path = os.path.join(self.persist_directory, f"{collection_name}.faiss")
faiss_pkl_path = os.path.join(self.persist_directory, f"{collection_name}.pkl")
# Check if FAISS index exists
if not os.path.exists(faiss_index_path) and not os.path.exists(faiss_pkl_path):
# Try default naming convention
faiss_index_path = os.path.join(self.persist_directory, "index.faiss")
faiss_pkl_path = os.path.join(self.persist_directory, "index.pkl")
if not os.path.exists(faiss_index_path):
logger.warning(f"No FAISS index found at {self.persist_directory}, will need to re-index")
# We could trigger re-indexing here or raise to try next fallback
raise FileNotFoundError(f"FAISS index not found at {self.persist_directory}")
vector_store = FAISS.load_local(
folder_path=self.persist_directory,
embeddings=self.embedding_function,
index_name=collection_name,
allow_dangerous_deserialization=True
)
logger.info(f"Loaded FAISS index from {self.persist_directory}")
return vector_store
elif vector_db_type == "qdrant":
from langchain_qdrant import QdrantVectorStore
url = os.getenv("QDRANT_URL")
api_key = os.getenv("QDRANT_API_KEY")
vector_store = QdrantVectorStore(
client=None,
collection_name=collection_name,
embedding=self.embedding_function,
url=url,
api_key=api_key,
)
logger.info(f"Connected to Qdrant at {url}")
return vector_store
else:
raise ValueError(f"Unsupported Vector DB: {vector_db_type}")
def get_retriever_with_reindex_fallback(
self,
documents: List[Document] = None,
collection_name: str = "codebase",
k: int = 10,
vector_db_type: str = "chroma"
):
"""Get retriever with automatic re-indexing fallback.
If the primary vector DB fails and fallback also fails to load,
this method will automatically re-index the documents using
the fallback database.
Args:
documents: Documents to re-index if needed (optional)
collection_name: Collection name
k: Number of results
vector_db_type: Primary DB type
Returns:
Configured retriever
"""
try:
return self.get_retriever(collection_name, k, vector_db_type)
except (RuntimeError, FileNotFoundError) as e:
if documents:
logger.warning(f"Retriever creation failed, attempting re-index with fallback DB: {e}")
# Get fallback DB
fallback_db = get_next_fallback_db(vector_db_type) or "faiss"
# Re-index with fallback
logger.info(f"Re-indexing {len(documents)} documents with {fallback_db}...")
self.index_documents(documents, collection_name, fallback_db)
# Try getting retriever again
return self.get_retriever(collection_name, k, fallback_db)
else:
raise
# Add incremental indexing methods to the Indexer class
from code_chatbot.ingestion.incremental_indexing import add_incremental_indexing_methods
Indexer = add_incremental_indexing_methods(Indexer)