Jesse Johnson
New commit for backend deployment: 2025-09-25_13-24-03
c59d808
# Vector Store Service - Simple setup for retriever use
import json
import os
import shutil
from typing import List, Dict, Any, Optional
from pathlib import Path
# Core LangChain imports (always needed)
from langchain.text_splitter import RecursiveCharacterTextSplitter
from langchain.schema import Document
# Local imports
from backend.config.settings import settings
from backend.config.database import db_settings
from backend.config.logging_config import get_logger
# MongoDB imports
from pymongo import MongoClient
from backend.services.custom_mongo_vector import CustomMongoDBVectorStore, VectorSearchOptions
# Setup logging
logger = get_logger("vector_store")
class VectorStoreService:
"""Simple vector store service - creates or retrieves vector store for retriever use"""
def __init__(self):
logger.info("πŸ“š Initializing Vector Store Service...")
try:
self.embeddings = self._get_embeddings()
logger.info("βœ… Embeddings setup completed")
self.vector_store = self._get_or_create_vector_store()
logger.info("βœ… Vector store setup completed")
logger.info("πŸš€ Vector Store Service initialization successful")
except Exception as e:
logger.error(f"❌ Vector Store Service initialization failed: {str(e)}", exc_info=True)
raise
def _get_embeddings(self):
"""Get embeddings provider based on configuration with conditional imports"""
embedding_config = settings.get_embedding_config()
provider = embedding_config["provider"]
logger.info(f"πŸ”§ Setting up embeddings provider: {provider}")
if provider == "openai":
try:
from langchain_openai import OpenAIEmbeddings
logger.info("βœ… OpenAI embeddings imported successfully")
return OpenAIEmbeddings(
openai_api_key=embedding_config["api_key"],
model=embedding_config["model"]
)
except ImportError as e:
logger.error(f"❌ OpenAI embeddings not available: {e}")
raise ImportError("OpenAI provider selected but langchain_openai not installed")
elif provider == "google":
try:
from langchain_google_genai import GoogleGenerativeAIEmbeddings
logger.info("βœ… Google embeddings imported successfully")
return GoogleGenerativeAIEmbeddings(
google_api_key=embedding_config["api_key"],
model=embedding_config["model"]
)
except ImportError as e:
logger.error(f"❌ Google embeddings not available: {e}")
raise ImportError("Google provider selected but langchain_google_genai not installed")
elif provider == "huggingface":
try:
# Try modern langchain-huggingface first
from langchain_huggingface import HuggingFaceEmbeddings
logger.info("βœ… HuggingFace embeddings imported successfully")
return HuggingFaceEmbeddings(
model_name=embedding_config["model"]
)
except ImportError:
try:
# Fallback to sentence-transformers directly
from sentence_transformers import SentenceTransformer
logger.warning("⚠️ Using sentence-transformers directly (langchain-huggingface not available)")
# Return a wrapper that mimics the embeddings interface
return self._create_sentence_transformer_wrapper(embedding_config["model"])
except ImportError as e:
logger.error(f"❌ HuggingFace embeddings not available: {e}")
logger.error("πŸ’‘ To fix this, install sentence-transformers: pip install sentence-transformers")
raise ImportError("HuggingFace provider selected but sentence-transformers not installed. Run: pip install sentence-transformers")
elif provider == "ollama":
try:
from langchain_community.embeddings import OllamaEmbeddings
logger.info("βœ… Ollama embeddings imported successfully")
return OllamaEmbeddings(
base_url=embedding_config["base_url"],
model=embedding_config["model"]
)
except ImportError as e:
logger.error(f"❌ Ollama embeddings not available: {e}")
raise ImportError("Ollama provider selected but langchain_community not installed")
else:
logger.warning(f"⚠️ Unknown embedding provider '{provider}', falling back to OpenAI")
try:
from langchain_openai import OpenAIEmbeddings
return OpenAIEmbeddings()
except ImportError:
logger.error("❌ No valid embedding provider available")
raise ImportError("No valid embedding provider available")
def _create_sentence_transformer_wrapper(self, model_name):
"""Create a simple wrapper for sentence-transformers to work with LangChain"""
from sentence_transformers import SentenceTransformer
class SentenceTransformerWrapper:
def __init__(self, model_name):
self.model = SentenceTransformer(model_name)
def encode(self, texts):
return self.model.encode(texts).tolist()
def embed_query(self, text):
return self.model.encode([text])[0].tolist()
return SentenceTransformerWrapper(model_name)
def _get_or_create_vector_store(self):
"""Get or create vector store with conditional imports"""
db_config = db_settings.get_vector_store_config()
provider = db_config["provider"]
if provider == "chromadb":
try:
from langchain_chroma import Chroma
persist_dir = Path(db_config["persist_directory"])
collection_name = db_config["collection_name"]
refresh_on_start = db_config.get("refresh_on_start", False)
# Check if refresh is requested
if refresh_on_start and persist_dir.exists():
logger.info(f"πŸ”„ CHROMADB_REFRESH_ON_START=true - Deleting existing ChromaDB at {persist_dir}")
shutil.rmtree(persist_dir)
logger.info(f"βœ… Existing ChromaDB deleted successfully")
# Check if persisted database exists
if persist_dir.exists() and any(persist_dir.iterdir()):
logger.info(f"πŸ“‚ Loading existing ChromaDB from {persist_dir}")
return Chroma(
collection_name=collection_name,
embedding_function=self.embeddings,
persist_directory=str(persist_dir)
)
else:
# Create new vector store with documents
logger.info(f"πŸ†• Creating new ChromaDB at {persist_dir}")
documents = self._load_documents_from_folder()
if documents:
vector_store = Chroma.from_documents(
documents=documents,
embedding=self.embeddings,
collection_name=collection_name,
persist_directory=str(persist_dir)
)
logger.info(f"βœ… Created ChromaDB with {len(documents)} document chunks")
return vector_store
else:
logger.info("πŸ“ No documents found, creating empty ChromaDB")
return Chroma(
collection_name=collection_name,
embedding_function=self.embeddings,
persist_directory=str(persist_dir)
)
except ImportError as e:
logger.error(f"❌ ChromaDB not available: {e}")
raise ImportError("ChromaDB provider selected but langchain_chroma not installed")
elif provider == "mongodb":
try:
logger.info("πŸ”— Setting up MongoDB Atlas connection...")
client = MongoClient(db_config["uri"])
client.admin.command('ping')
logger.info(f"βœ… MongoDB Atlas connection verified")
print(client.list_database_names())
# Get the collection
database = client[db_config["database"]]
collection = database[db_config["collection_name"]]
# Create streamlined vector store with Atlas Vector Search
options = VectorSearchOptions(
index_name=db_config.get("index_name", "vector_index"),
embedding_key=db_config.get("vector_field", "ingredients_emb"),
text_key="title",
num_candidates=db_config.get("num_candidates", 50),
similarity_metric=db_config.get("similarity_metric", "cosine")
)
vector_store = CustomMongoDBVectorStore(
collection=collection,
embedding_function=self.embeddings,
options=options
)
logger.info(f"βœ… Custom MongoDB Vector Store created successfully")
logger.info("🎯 Using pre-existing embeddings without requiring vector search index")
return vector_store
except ImportError as e:
logger.error(f"❌ MongoDB packages not available: {e}")
raise ImportError("MongoDB provider selected but langchain-mongodb not installed. Run: pip install langchain-mongodb pymongo")
except Exception as e:
logger.error(f"❌ MongoDB Atlas connection failed: {e}")
raise ConnectionError(f"Failed to connect to MongoDB Atlas: {e}")
else:
logger.warning(f"⚠️ Unknown vector store provider '{provider}', falling back to ChromaDB")
try:
from langchain_chroma import Chroma
return Chroma(
collection_name="fallback_collection",
embedding_function=self.embeddings,
persist_directory="./vector_store/fallback_chroma"
)
except ImportError:
logger.error("❌ No valid vector store provider available")
raise ImportError("No valid vector store provider available")
def _load_documents_from_folder(self, folder_path: str = "./data/recipes") -> List[Document]:
"""Load and chunk all documents from folder with UTF-8 encoding, fallback to sample data"""
logger.info(f"πŸ“„ Loading documents from: {folder_path}")
documents = []
folder = Path(folder_path)
# Check if folder exists and has files
has_recipe_files = False
if folder.exists():
# Check if there are any files in the recipes folder
recipe_files = list(folder.rglob("*"))
has_recipe_files = any(f.is_file() and f.stat().st_size > 0 for f in recipe_files)
# If no recipe files found, use sample data
if not has_recipe_files:
logger.info(f"πŸ“­ No recipe files found in {folder_path}, using sample data")
folder_path = "./data" # Use data folder where sample_recipes.json is located
folder = Path(folder_path)
if not folder.exists():
logger.error(f"❌ Folder does not exist: {folder.absolute()}")
return documents
# Text splitter for chunking
text_splitter = RecursiveCharacterTextSplitter(
chunk_size=1000,
chunk_overlap=200
)
# Process all text-based files uniformly
for file_path in folder.rglob("*"):
if file_path.is_file():
try:
# Read file content with UTF-8 encoding
with open(file_path, 'r', encoding='utf-8') as f:
content = f.read()
# Skip empty files
if not content.strip():
continue
# Handle JSON files specially to format them properly
if file_path.suffix.lower() == '.json':
formatted_content = self._format_json_recipes(content, file_path)
if formatted_content:
content = formatted_content
# Split content into chunks using text splitter
chunks = text_splitter.split_text(content)
# Create documents for each chunk
for i, chunk in enumerate(chunks):
documents.append(Document(
page_content=chunk,
metadata={
"source": str(file_path),
"filename": file_path.name,
"chunk_index": i,
"file_type": file_path.suffix
}
))
except Exception as e:
logger.error(f"❌ Error loading {file_path}: {e}")
continue
logger.info(f"βœ… Loaded and chunked {len(documents)} document segments")
return documents
def _format_json_recipes(self, json_content: str, file_path: Path) -> str:
"""Format JSON recipe data into readable text format similar to MongoDB output"""
try:
import json
recipes = json.loads(json_content)
# Handle both single recipe object and array of recipes
if isinstance(recipes, dict):
recipes = [recipes]
elif not isinstance(recipes, list):
logger.warning(f"⚠️ Unexpected JSON structure in {file_path}")
return None
formatted_recipes = []
for recipe in recipes:
if not isinstance(recipe, dict):
continue
# Extract recipe components
title = recipe.get("title", "Untitled Recipe")
ingredients = recipe.get("ingredients", [])
instructions = recipe.get("instructions", "")
# Format similar to MongoDB output
formatted_content = f"Recipe: {title}\n"
if ingredients:
if isinstance(ingredients, list):
formatted_content += f"Ingredients: {', '.join(ingredients)}\n"
else:
formatted_content += f"Ingredients: {ingredients}\n"
if instructions:
# Handle both string and list instructions
if isinstance(instructions, list):
formatted_content += f"Instructions: {' '.join(instructions)}"
else:
formatted_content += f"Instructions: {instructions}"
# Add metadata if available
metadata = recipe.get("metadata", {})
if metadata:
formatted_content += f"\n"
for key, value in metadata.items():
if key in ["cook_time", "difficulty", "servings", "category"]:
formatted_content += f"{key.replace('_', ' ').title()}: {value}\n"
formatted_recipes.append(formatted_content)
# Join all recipes with double newlines
result = "\n\n".join(formatted_recipes)
logger.info(f"βœ… Formatted {len(recipes)} JSON recipes from {file_path.name}")
return result
except json.JSONDecodeError as e:
logger.error(f"❌ Invalid JSON in {file_path}: {e}")
return None
except Exception as e:
logger.error(f"❌ Error formatting JSON recipes from {file_path}: {e}")
return None
def get_retriever(self):
"""Get retriever for use with ConversationalRetrievalChain"""
logger.info("πŸ” Creating retriever from vector store...")
# For both ChromaDB and MongoDB Atlas, create standard retriever
retriever = self.vector_store.as_retriever()
# Configure search parameters based on provider
if hasattr(self.vector_store, '__class__'):
class_name = self.vector_store.__class__.__name__
if 'MongoDB' in class_name:
# MongoDB Atlas configuration
retriever.search_kwargs = {"k": 5}
logger.info("πŸ” MongoDB Atlas retriever configured with k=5")
else:
# ChromaDB configuration
retriever.search_kwargs = {"k": 5}
logger.info("πŸ” ChromaDB retriever configured with k=5")
return retriever
# Create global vector store service instance
vector_store_service = VectorStoreService()