|
|
|
|
|
import json |
|
|
import os |
|
|
import shutil |
|
|
from typing import List, Dict, Any, Optional |
|
|
from pathlib import Path |
|
|
|
|
|
|
|
|
from langchain.text_splitter import RecursiveCharacterTextSplitter |
|
|
from langchain.schema import Document |
|
|
|
|
|
|
|
|
from backend.config.settings import settings |
|
|
from backend.config.database import db_settings |
|
|
from backend.config.logging_config import get_logger |
|
|
|
|
|
|
|
|
from pymongo import MongoClient |
|
|
from backend.services.custom_mongo_vector import CustomMongoDBVectorStore, VectorSearchOptions |
|
|
|
|
|
|
|
|
logger = get_logger("vector_store") |
|
|
|
|
|
class VectorStoreService: |
|
|
"""Simple vector store service - creates or retrieves vector store for retriever use""" |
|
|
|
|
|
def __init__(self): |
|
|
logger.info("π Initializing Vector Store Service...") |
|
|
|
|
|
try: |
|
|
self.embeddings = self._get_embeddings() |
|
|
logger.info("β
Embeddings setup completed") |
|
|
|
|
|
self.vector_store = self._get_or_create_vector_store() |
|
|
logger.info("β
Vector store setup completed") |
|
|
|
|
|
logger.info("π Vector Store Service initialization successful") |
|
|
|
|
|
except Exception as e: |
|
|
logger.error(f"β Vector Store Service initialization failed: {str(e)}", exc_info=True) |
|
|
raise |
|
|
|
|
|
def _get_embeddings(self): |
|
|
"""Get embeddings provider based on configuration with conditional imports""" |
|
|
embedding_config = settings.get_embedding_config() |
|
|
provider = embedding_config["provider"] |
|
|
|
|
|
logger.info(f"π§ Setting up embeddings provider: {provider}") |
|
|
|
|
|
if provider == "openai": |
|
|
try: |
|
|
from langchain_openai import OpenAIEmbeddings |
|
|
logger.info("β
OpenAI embeddings imported successfully") |
|
|
return OpenAIEmbeddings( |
|
|
openai_api_key=embedding_config["api_key"], |
|
|
model=embedding_config["model"] |
|
|
) |
|
|
except ImportError as e: |
|
|
logger.error(f"β OpenAI embeddings not available: {e}") |
|
|
raise ImportError("OpenAI provider selected but langchain_openai not installed") |
|
|
|
|
|
elif provider == "google": |
|
|
try: |
|
|
from langchain_google_genai import GoogleGenerativeAIEmbeddings |
|
|
logger.info("β
Google embeddings imported successfully") |
|
|
return GoogleGenerativeAIEmbeddings( |
|
|
google_api_key=embedding_config["api_key"], |
|
|
model=embedding_config["model"] |
|
|
) |
|
|
except ImportError as e: |
|
|
logger.error(f"β Google embeddings not available: {e}") |
|
|
raise ImportError("Google provider selected but langchain_google_genai not installed") |
|
|
|
|
|
elif provider == "huggingface": |
|
|
try: |
|
|
|
|
|
from langchain_huggingface import HuggingFaceEmbeddings |
|
|
logger.info("β
HuggingFace embeddings imported successfully") |
|
|
return HuggingFaceEmbeddings( |
|
|
model_name=embedding_config["model"] |
|
|
) |
|
|
except ImportError: |
|
|
try: |
|
|
|
|
|
from sentence_transformers import SentenceTransformer |
|
|
logger.warning("β οΈ Using sentence-transformers directly (langchain-huggingface not available)") |
|
|
|
|
|
return self._create_sentence_transformer_wrapper(embedding_config["model"]) |
|
|
except ImportError as e: |
|
|
logger.error(f"β HuggingFace embeddings not available: {e}") |
|
|
logger.error("π‘ To fix this, install sentence-transformers: pip install sentence-transformers") |
|
|
raise ImportError("HuggingFace provider selected but sentence-transformers not installed. Run: pip install sentence-transformers") |
|
|
|
|
|
elif provider == "ollama": |
|
|
try: |
|
|
from langchain_community.embeddings import OllamaEmbeddings |
|
|
logger.info("β
Ollama embeddings imported successfully") |
|
|
return OllamaEmbeddings( |
|
|
base_url=embedding_config["base_url"], |
|
|
model=embedding_config["model"] |
|
|
) |
|
|
except ImportError as e: |
|
|
logger.error(f"β Ollama embeddings not available: {e}") |
|
|
raise ImportError("Ollama provider selected but langchain_community not installed") |
|
|
|
|
|
else: |
|
|
logger.warning(f"β οΈ Unknown embedding provider '{provider}', falling back to OpenAI") |
|
|
try: |
|
|
from langchain_openai import OpenAIEmbeddings |
|
|
return OpenAIEmbeddings() |
|
|
except ImportError: |
|
|
logger.error("β No valid embedding provider available") |
|
|
raise ImportError("No valid embedding provider available") |
|
|
|
|
|
def _create_sentence_transformer_wrapper(self, model_name): |
|
|
"""Create a simple wrapper for sentence-transformers to work with LangChain""" |
|
|
from sentence_transformers import SentenceTransformer |
|
|
|
|
|
class SentenceTransformerWrapper: |
|
|
def __init__(self, model_name): |
|
|
self.model = SentenceTransformer(model_name) |
|
|
|
|
|
def encode(self, texts): |
|
|
return self.model.encode(texts).tolist() |
|
|
|
|
|
def embed_query(self, text): |
|
|
return self.model.encode([text])[0].tolist() |
|
|
|
|
|
return SentenceTransformerWrapper(model_name) |
|
|
|
|
|
def _get_or_create_vector_store(self): |
|
|
"""Get or create vector store with conditional imports""" |
|
|
db_config = db_settings.get_vector_store_config() |
|
|
provider = db_config["provider"] |
|
|
|
|
|
if provider == "chromadb": |
|
|
try: |
|
|
from langchain_chroma import Chroma |
|
|
|
|
|
persist_dir = Path(db_config["persist_directory"]) |
|
|
collection_name = db_config["collection_name"] |
|
|
refresh_on_start = db_config.get("refresh_on_start", False) |
|
|
|
|
|
|
|
|
if refresh_on_start and persist_dir.exists(): |
|
|
logger.info(f"π CHROMADB_REFRESH_ON_START=true - Deleting existing ChromaDB at {persist_dir}") |
|
|
shutil.rmtree(persist_dir) |
|
|
logger.info(f"β
Existing ChromaDB deleted successfully") |
|
|
|
|
|
|
|
|
if persist_dir.exists() and any(persist_dir.iterdir()): |
|
|
logger.info(f"π Loading existing ChromaDB from {persist_dir}") |
|
|
return Chroma( |
|
|
collection_name=collection_name, |
|
|
embedding_function=self.embeddings, |
|
|
persist_directory=str(persist_dir) |
|
|
) |
|
|
else: |
|
|
|
|
|
logger.info(f"π Creating new ChromaDB at {persist_dir}") |
|
|
documents = self._load_documents_from_folder() |
|
|
|
|
|
if documents: |
|
|
vector_store = Chroma.from_documents( |
|
|
documents=documents, |
|
|
embedding=self.embeddings, |
|
|
collection_name=collection_name, |
|
|
persist_directory=str(persist_dir) |
|
|
) |
|
|
logger.info(f"β
Created ChromaDB with {len(documents)} document chunks") |
|
|
return vector_store |
|
|
else: |
|
|
logger.info("π No documents found, creating empty ChromaDB") |
|
|
return Chroma( |
|
|
collection_name=collection_name, |
|
|
embedding_function=self.embeddings, |
|
|
persist_directory=str(persist_dir) |
|
|
) |
|
|
except ImportError as e: |
|
|
logger.error(f"β ChromaDB not available: {e}") |
|
|
raise ImportError("ChromaDB provider selected but langchain_chroma not installed") |
|
|
|
|
|
elif provider == "mongodb": |
|
|
try: |
|
|
logger.info("π Setting up MongoDB Atlas connection...") |
|
|
client = MongoClient(db_config["uri"]) |
|
|
client.admin.command('ping') |
|
|
logger.info(f"β
MongoDB Atlas connection verified") |
|
|
print(client.list_database_names()) |
|
|
|
|
|
database = client[db_config["database"]] |
|
|
collection = database[db_config["collection_name"]] |
|
|
|
|
|
options = VectorSearchOptions( |
|
|
index_name=db_config.get("index_name", "vector_index"), |
|
|
embedding_key=db_config.get("vector_field", "ingredients_emb"), |
|
|
text_key="title", |
|
|
num_candidates=db_config.get("num_candidates", 50), |
|
|
similarity_metric=db_config.get("similarity_metric", "cosine") |
|
|
) |
|
|
|
|
|
vector_store = CustomMongoDBVectorStore( |
|
|
collection=collection, |
|
|
embedding_function=self.embeddings, |
|
|
options=options |
|
|
) |
|
|
|
|
|
logger.info(f"β
Custom MongoDB Vector Store created successfully") |
|
|
logger.info("π― Using pre-existing embeddings without requiring vector search index") |
|
|
return vector_store |
|
|
|
|
|
except ImportError as e: |
|
|
logger.error(f"β MongoDB packages not available: {e}") |
|
|
raise ImportError("MongoDB provider selected but langchain-mongodb not installed. Run: pip install langchain-mongodb pymongo") |
|
|
except Exception as e: |
|
|
logger.error(f"β MongoDB Atlas connection failed: {e}") |
|
|
raise ConnectionError(f"Failed to connect to MongoDB Atlas: {e}") |
|
|
|
|
|
else: |
|
|
logger.warning(f"β οΈ Unknown vector store provider '{provider}', falling back to ChromaDB") |
|
|
try: |
|
|
from langchain_chroma import Chroma |
|
|
return Chroma( |
|
|
collection_name="fallback_collection", |
|
|
embedding_function=self.embeddings, |
|
|
persist_directory="./vector_store/fallback_chroma" |
|
|
) |
|
|
except ImportError: |
|
|
logger.error("β No valid vector store provider available") |
|
|
raise ImportError("No valid vector store provider available") |
|
|
|
|
|
def _load_documents_from_folder(self, folder_path: str = "./data/recipes") -> List[Document]: |
|
|
"""Load and chunk all documents from folder with UTF-8 encoding, fallback to sample data""" |
|
|
logger.info(f"π Loading documents from: {folder_path}") |
|
|
|
|
|
documents = [] |
|
|
folder = Path(folder_path) |
|
|
|
|
|
|
|
|
has_recipe_files = False |
|
|
if folder.exists(): |
|
|
|
|
|
recipe_files = list(folder.rglob("*")) |
|
|
has_recipe_files = any(f.is_file() and f.stat().st_size > 0 for f in recipe_files) |
|
|
|
|
|
|
|
|
if not has_recipe_files: |
|
|
logger.info(f"π No recipe files found in {folder_path}, using sample data") |
|
|
folder_path = "./data" |
|
|
folder = Path(folder_path) |
|
|
|
|
|
if not folder.exists(): |
|
|
logger.error(f"β Folder does not exist: {folder.absolute()}") |
|
|
return documents |
|
|
|
|
|
|
|
|
text_splitter = RecursiveCharacterTextSplitter( |
|
|
chunk_size=1000, |
|
|
chunk_overlap=200 |
|
|
) |
|
|
|
|
|
|
|
|
for file_path in folder.rglob("*"): |
|
|
if file_path.is_file(): |
|
|
try: |
|
|
|
|
|
with open(file_path, 'r', encoding='utf-8') as f: |
|
|
content = f.read() |
|
|
|
|
|
|
|
|
if not content.strip(): |
|
|
continue |
|
|
|
|
|
|
|
|
if file_path.suffix.lower() == '.json': |
|
|
formatted_content = self._format_json_recipes(content, file_path) |
|
|
if formatted_content: |
|
|
content = formatted_content |
|
|
|
|
|
|
|
|
chunks = text_splitter.split_text(content) |
|
|
|
|
|
|
|
|
for i, chunk in enumerate(chunks): |
|
|
documents.append(Document( |
|
|
page_content=chunk, |
|
|
metadata={ |
|
|
"source": str(file_path), |
|
|
"filename": file_path.name, |
|
|
"chunk_index": i, |
|
|
"file_type": file_path.suffix |
|
|
} |
|
|
)) |
|
|
|
|
|
except Exception as e: |
|
|
logger.error(f"β Error loading {file_path}: {e}") |
|
|
continue |
|
|
|
|
|
logger.info(f"β
Loaded and chunked {len(documents)} document segments") |
|
|
return documents |
|
|
|
|
|
def _format_json_recipes(self, json_content: str, file_path: Path) -> str: |
|
|
"""Format JSON recipe data into readable text format similar to MongoDB output""" |
|
|
try: |
|
|
import json |
|
|
recipes = json.loads(json_content) |
|
|
|
|
|
|
|
|
if isinstance(recipes, dict): |
|
|
recipes = [recipes] |
|
|
elif not isinstance(recipes, list): |
|
|
logger.warning(f"β οΈ Unexpected JSON structure in {file_path}") |
|
|
return None |
|
|
|
|
|
formatted_recipes = [] |
|
|
|
|
|
for recipe in recipes: |
|
|
if not isinstance(recipe, dict): |
|
|
continue |
|
|
|
|
|
|
|
|
title = recipe.get("title", "Untitled Recipe") |
|
|
ingredients = recipe.get("ingredients", []) |
|
|
instructions = recipe.get("instructions", "") |
|
|
|
|
|
|
|
|
formatted_content = f"Recipe: {title}\n" |
|
|
|
|
|
if ingredients: |
|
|
if isinstance(ingredients, list): |
|
|
formatted_content += f"Ingredients: {', '.join(ingredients)}\n" |
|
|
else: |
|
|
formatted_content += f"Ingredients: {ingredients}\n" |
|
|
|
|
|
if instructions: |
|
|
|
|
|
if isinstance(instructions, list): |
|
|
formatted_content += f"Instructions: {' '.join(instructions)}" |
|
|
else: |
|
|
formatted_content += f"Instructions: {instructions}" |
|
|
|
|
|
|
|
|
metadata = recipe.get("metadata", {}) |
|
|
if metadata: |
|
|
formatted_content += f"\n" |
|
|
for key, value in metadata.items(): |
|
|
if key in ["cook_time", "difficulty", "servings", "category"]: |
|
|
formatted_content += f"{key.replace('_', ' ').title()}: {value}\n" |
|
|
|
|
|
formatted_recipes.append(formatted_content) |
|
|
|
|
|
|
|
|
result = "\n\n".join(formatted_recipes) |
|
|
logger.info(f"β
Formatted {len(recipes)} JSON recipes from {file_path.name}") |
|
|
return result |
|
|
|
|
|
except json.JSONDecodeError as e: |
|
|
logger.error(f"β Invalid JSON in {file_path}: {e}") |
|
|
return None |
|
|
except Exception as e: |
|
|
logger.error(f"β Error formatting JSON recipes from {file_path}: {e}") |
|
|
return None |
|
|
|
|
|
def get_retriever(self): |
|
|
"""Get retriever for use with ConversationalRetrievalChain""" |
|
|
logger.info("π Creating retriever from vector store...") |
|
|
|
|
|
|
|
|
retriever = self.vector_store.as_retriever() |
|
|
|
|
|
|
|
|
if hasattr(self.vector_store, '__class__'): |
|
|
class_name = self.vector_store.__class__.__name__ |
|
|
if 'MongoDB' in class_name: |
|
|
|
|
|
retriever.search_kwargs = {"k": 5} |
|
|
logger.info("π MongoDB Atlas retriever configured with k=5") |
|
|
else: |
|
|
|
|
|
retriever.search_kwargs = {"k": 5} |
|
|
logger.info("π ChromaDB retriever configured with k=5") |
|
|
|
|
|
return retriever |
|
|
|
|
|
|
|
|
vector_store_service = VectorStoreService() |