# Vector Store Service - Simple setup for retriever use import json import os import shutil from typing import List, Dict, Any, Optional from pathlib import Path # Core LangChain imports (always needed) from langchain.text_splitter import RecursiveCharacterTextSplitter from langchain.schema import Document # Local imports from backend.config.settings import settings from backend.config.database import db_settings from backend.config.logging_config import get_logger # MongoDB imports from pymongo import MongoClient from backend.services.custom_mongo_vector import CustomMongoDBVectorStore, VectorSearchOptions # Setup logging logger = get_logger("vector_store") class VectorStoreService: """Simple vector store service - creates or retrieves vector store for retriever use""" def __init__(self): logger.info("📚 Initializing Vector Store Service...") try: self.embeddings = self._get_embeddings() logger.info("✅ Embeddings setup completed") self.vector_store = self._get_or_create_vector_store() logger.info("✅ Vector store setup completed") logger.info("🚀 Vector Store Service initialization successful") except Exception as e: logger.error(f"❌ Vector Store Service initialization failed: {str(e)}", exc_info=True) raise def _get_embeddings(self): """Get embeddings provider based on configuration with conditional imports""" embedding_config = settings.get_embedding_config() provider = embedding_config["provider"] logger.info(f"🔧 Setting up embeddings provider: {provider}") if provider == "openai": try: from langchain_openai import OpenAIEmbeddings logger.info("✅ OpenAI embeddings imported successfully") return OpenAIEmbeddings( openai_api_key=embedding_config["api_key"], model=embedding_config["model"] ) except ImportError as e: logger.error(f"❌ OpenAI embeddings not available: {e}") raise ImportError("OpenAI provider selected but langchain_openai not installed") elif provider == "google": try: from langchain_google_genai import GoogleGenerativeAIEmbeddings logger.info("✅ Google embeddings imported successfully") return GoogleGenerativeAIEmbeddings( google_api_key=embedding_config["api_key"], model=embedding_config["model"] ) except ImportError as e: logger.error(f"❌ Google embeddings not available: {e}") raise ImportError("Google provider selected but langchain_google_genai not installed") elif provider == "huggingface": try: # Try modern langchain-huggingface first from langchain_huggingface import HuggingFaceEmbeddings logger.info("✅ HuggingFace embeddings imported successfully") return HuggingFaceEmbeddings( model_name=embedding_config["model"] ) except ImportError: try: # Fallback to sentence-transformers directly from sentence_transformers import SentenceTransformer logger.warning("⚠️ Using sentence-transformers directly (langchain-huggingface not available)") # Return a wrapper that mimics the embeddings interface return self._create_sentence_transformer_wrapper(embedding_config["model"]) except ImportError as e: logger.error(f"❌ HuggingFace embeddings not available: {e}") logger.error("💡 To fix this, install sentence-transformers: pip install sentence-transformers") raise ImportError("HuggingFace provider selected but sentence-transformers not installed. Run: pip install sentence-transformers") elif provider == "ollama": try: from langchain_community.embeddings import OllamaEmbeddings logger.info("✅ Ollama embeddings imported successfully") return OllamaEmbeddings( base_url=embedding_config["base_url"], model=embedding_config["model"] ) except ImportError as e: logger.error(f"❌ Ollama embeddings not available: {e}") raise ImportError("Ollama provider selected but langchain_community not installed") else: logger.warning(f"⚠️ Unknown embedding provider '{provider}', falling back to OpenAI") try: from langchain_openai import OpenAIEmbeddings return OpenAIEmbeddings() except ImportError: logger.error("❌ No valid embedding provider available") raise ImportError("No valid embedding provider available") def _create_sentence_transformer_wrapper(self, model_name): """Create a simple wrapper for sentence-transformers to work with LangChain""" from sentence_transformers import SentenceTransformer class SentenceTransformerWrapper: def __init__(self, model_name): self.model = SentenceTransformer(model_name) def encode(self, texts): return self.model.encode(texts).tolist() def embed_query(self, text): return self.model.encode([text])[0].tolist() return SentenceTransformerWrapper(model_name) def _get_or_create_vector_store(self): """Get or create vector store with conditional imports""" db_config = db_settings.get_vector_store_config() provider = db_config["provider"] if provider == "chromadb": try: from langchain_chroma import Chroma persist_dir = Path(db_config["persist_directory"]) collection_name = db_config["collection_name"] refresh_on_start = db_config.get("refresh_on_start", False) # Check if refresh is requested if refresh_on_start and persist_dir.exists(): logger.info(f"🔄 CHROMADB_REFRESH_ON_START=true - Deleting existing ChromaDB at {persist_dir}") shutil.rmtree(persist_dir) logger.info(f"✅ Existing ChromaDB deleted successfully") # Check if persisted database exists if persist_dir.exists() and any(persist_dir.iterdir()): logger.info(f"📂 Loading existing ChromaDB from {persist_dir}") return Chroma( collection_name=collection_name, embedding_function=self.embeddings, persist_directory=str(persist_dir) ) else: # Create new vector store with documents logger.info(f"🆕 Creating new ChromaDB at {persist_dir}") documents = self._load_documents_from_folder() if documents: vector_store = Chroma.from_documents( documents=documents, embedding=self.embeddings, collection_name=collection_name, persist_directory=str(persist_dir) ) logger.info(f"✅ Created ChromaDB with {len(documents)} document chunks") return vector_store else: logger.info("📝 No documents found, creating empty ChromaDB") return Chroma( collection_name=collection_name, embedding_function=self.embeddings, persist_directory=str(persist_dir) ) except ImportError as e: logger.error(f"❌ ChromaDB not available: {e}") raise ImportError("ChromaDB provider selected but langchain_chroma not installed") elif provider == "mongodb": try: logger.info("🔗 Setting up MongoDB Atlas connection...") client = MongoClient(db_config["uri"]) client.admin.command('ping') logger.info(f"✅ MongoDB Atlas connection verified") print(client.list_database_names()) # Get the collection database = client[db_config["database"]] collection = database[db_config["collection_name"]] # Create streamlined vector store with Atlas Vector Search options = VectorSearchOptions( index_name=db_config.get("index_name", "vector_index"), embedding_key=db_config.get("vector_field", "ingredients_emb"), text_key="title", num_candidates=db_config.get("num_candidates", 50), similarity_metric=db_config.get("similarity_metric", "cosine") ) vector_store = CustomMongoDBVectorStore( collection=collection, embedding_function=self.embeddings, options=options ) logger.info(f"✅ Custom MongoDB Vector Store created successfully") logger.info("🎯 Using pre-existing embeddings without requiring vector search index") return vector_store except ImportError as e: logger.error(f"❌ MongoDB packages not available: {e}") raise ImportError("MongoDB provider selected but langchain-mongodb not installed. Run: pip install langchain-mongodb pymongo") except Exception as e: logger.error(f"❌ MongoDB Atlas connection failed: {e}") raise ConnectionError(f"Failed to connect to MongoDB Atlas: {e}") else: logger.warning(f"⚠️ Unknown vector store provider '{provider}', falling back to ChromaDB") try: from langchain_chroma import Chroma return Chroma( collection_name="fallback_collection", embedding_function=self.embeddings, persist_directory="./vector_store/fallback_chroma" ) except ImportError: logger.error("❌ No valid vector store provider available") raise ImportError("No valid vector store provider available") def _load_documents_from_folder(self, folder_path: str = "./data/recipes") -> List[Document]: """Load and chunk all documents from folder with UTF-8 encoding, fallback to sample data""" logger.info(f"📄 Loading documents from: {folder_path}") documents = [] folder = Path(folder_path) # Check if folder exists and has files has_recipe_files = False if folder.exists(): # Check if there are any files in the recipes folder recipe_files = list(folder.rglob("*")) has_recipe_files = any(f.is_file() and f.stat().st_size > 0 for f in recipe_files) # If no recipe files found, use sample data if not has_recipe_files: logger.info(f"📭 No recipe files found in {folder_path}, using sample data") folder_path = "./data" # Use data folder where sample_recipes.json is located folder = Path(folder_path) if not folder.exists(): logger.error(f"❌ Folder does not exist: {folder.absolute()}") return documents # Text splitter for chunking text_splitter = RecursiveCharacterTextSplitter( chunk_size=1000, chunk_overlap=200 ) # Process all text-based files uniformly for file_path in folder.rglob("*"): if file_path.is_file(): try: # Read file content with UTF-8 encoding with open(file_path, 'r', encoding='utf-8') as f: content = f.read() # Skip empty files if not content.strip(): continue # Handle JSON files specially to format them properly if file_path.suffix.lower() == '.json': formatted_content = self._format_json_recipes(content, file_path) if formatted_content: content = formatted_content # Split content into chunks using text splitter chunks = text_splitter.split_text(content) # Create documents for each chunk for i, chunk in enumerate(chunks): documents.append(Document( page_content=chunk, metadata={ "source": str(file_path), "filename": file_path.name, "chunk_index": i, "file_type": file_path.suffix } )) except Exception as e: logger.error(f"❌ Error loading {file_path}: {e}") continue logger.info(f"✅ Loaded and chunked {len(documents)} document segments") return documents def _format_json_recipes(self, json_content: str, file_path: Path) -> str: """Format JSON recipe data into readable text format similar to MongoDB output""" try: import json recipes = json.loads(json_content) # Handle both single recipe object and array of recipes if isinstance(recipes, dict): recipes = [recipes] elif not isinstance(recipes, list): logger.warning(f"⚠️ Unexpected JSON structure in {file_path}") return None formatted_recipes = [] for recipe in recipes: if not isinstance(recipe, dict): continue # Extract recipe components title = recipe.get("title", "Untitled Recipe") ingredients = recipe.get("ingredients", []) instructions = recipe.get("instructions", "") # Format similar to MongoDB output formatted_content = f"Recipe: {title}\n" if ingredients: if isinstance(ingredients, list): formatted_content += f"Ingredients: {', '.join(ingredients)}\n" else: formatted_content += f"Ingredients: {ingredients}\n" if instructions: # Handle both string and list instructions if isinstance(instructions, list): formatted_content += f"Instructions: {' '.join(instructions)}" else: formatted_content += f"Instructions: {instructions}" # Add metadata if available metadata = recipe.get("metadata", {}) if metadata: formatted_content += f"\n" for key, value in metadata.items(): if key in ["cook_time", "difficulty", "servings", "category"]: formatted_content += f"{key.replace('_', ' ').title()}: {value}\n" formatted_recipes.append(formatted_content) # Join all recipes with double newlines result = "\n\n".join(formatted_recipes) logger.info(f"✅ Formatted {len(recipes)} JSON recipes from {file_path.name}") return result except json.JSONDecodeError as e: logger.error(f"❌ Invalid JSON in {file_path}: {e}") return None except Exception as e: logger.error(f"❌ Error formatting JSON recipes from {file_path}: {e}") return None def get_retriever(self): """Get retriever for use with ConversationalRetrievalChain""" logger.info("🔍 Creating retriever from vector store...") # For both ChromaDB and MongoDB Atlas, create standard retriever retriever = self.vector_store.as_retriever() # Configure search parameters based on provider if hasattr(self.vector_store, '__class__'): class_name = self.vector_store.__class__.__name__ if 'MongoDB' in class_name: # MongoDB Atlas configuration retriever.search_kwargs = {"k": 5} logger.info("🔍 MongoDB Atlas retriever configured with k=5") else: # ChromaDB configuration retriever.search_kwargs = {"k": 5} logger.info("🔍 ChromaDB retriever configured with k=5") return retriever # Create global vector store service instance vector_store_service = VectorStoreService()