""" semantic_search.py ------------------ Semantic search over an Amazon product catalogue using FAISS + HuggingFace embeddings. Expected inputs --------------- - metadata_dataset : datasets.Dataset — one row per product (raw_metadata["full"]) - reviews_dataset : datasets.Dataset — passed to get_best_reviews(reviews, asin, k) Typical usage ------------- docs = build_documents(raw_metadata["full"], raw_reviews, n=100) store = build_vector_store(docs) results = semantic_search("noise cancelling headphones", store, k=5) """ import logging from typing import Any import streamlit as st import torch import json, os, sys from pathlib import Path import faiss from datasets import Dataset from langchain_community.docstore.in_memory import InMemoryDocstore from langchain_community.vectorstores import FAISS from langchain_core.documents import Document from langchain_huggingface import HuggingFaceEmbeddings ROOT_FOLDER = Path(__file__).resolve().parent.parent sys.path.append(str(ROOT_FOLDER)) from src.eda_helpers import get_best_reviews from src.utils import decode_ratings, extract_image logger = logging.getLogger(__name__) # --------------------------------------------------------------------------- # Constants # --------------------------------------------------------------------------- DEFAULT_EMBEDDING_MODEL = "sentence-transformers/all-MiniLM-L6-v2" DEFAULT_TOP_REVIEWS = 5 DEFAULT_TOP_K = 5 DEVICE = "cuda" if torch.cuda.is_available() else "cpu" @st.cache_resource(show_spinner=False) def get_embeddings(): """Initializes and returns a HuggingFaceEmbeddings instance with the specified model and device settings.""" return HuggingFaceEmbeddings( model_name=DEFAULT_EMBEDDING_MODEL, model_kwargs={ "device": DEVICE, "model_kwargs": {"torch_dtype": torch.float16}, }, encode_kwargs={ "batch_size": 128 if DEVICE == "cpu" else 512, "normalize_embeddings": True, }, ) # --------------------------------------------------------------------------- # Document construction # --------------------------------------------------------------------------- def _format_review(review) -> str: """Return a concise single-line string for one review.""" rating = review.get("rating", "?") title = (review.get("title") or "").strip() text = (review.get("text") or "").strip() return f"[{rating}★] {title} — {text}" def _build_reviews_block( reviews: Dataset, parent_asin: str, k: int = DEFAULT_TOP_REVIEWS, ) -> str: """ Fetch top-k reviews for *parent_asin* and return a formatted text block. Returns an empty string when no reviews are found. """ total, product_reviews = get_best_reviews(reviews, parent_asin, k) if not product_reviews: return 0, "" lines = "\n ".join(_format_review(r) for r in product_reviews) return total, f"{lines}" def _build_page_content(product, review_block: str) -> str: """Assemble the text that will be embedded. Empty sections are omitted.""" title = (product.get("title") or "").strip() main_category = (product.get("main_category") or "").strip() categories = main_category +" >> " + " > ".join(product.get("categories") or []) features = "\n ".join(product.get("features") or []) description = " ".join(product.get("description") or []) details = (product.get("details") or "").strip() parts = [f"Product: {title}"] if categories: parts.append(f"Category Path: {categories}") if features: parts.append(f"Features:\n {features}") if description: parts.append(f"Description:\n {description}") if review_block: parts.append(f"Top Reviews:\n {review_block}") if details: parts.append(f"Details:\n {details}") return "\n".join(parts) def create_document(product, reviews: Dataset) -> Document | None: """ Build a :class:`~langchain_core.documents.Document` from one product row. Args: product: A single row from a HuggingFace metadata Dataset (dict-like). reviews: The full reviews Dataset, forwarded to ``get_best_reviews``. Returns: A Document, or ``None`` if the row has no ``parent_asin``. Notes: *page_content* contains only the text that influences embeddings. *metadata* stores structured scalars used for filtering and display after retrieval — values are kept flat and JSON-serialisable so FAISS filter expressions work correctly. """ parent_asin = product.get("parent_asin") if not parent_asin: logger.warning("Skipping product with missing parent_asin: %s", product.get("title")) return None tot, review_block = _build_reviews_block(reviews, parent_asin) page_content = _build_page_content(product, review_block) metadata = { # --- identifiers --- "parent_asin": parent_asin, # --- numeric (filterable / rankable) --- "price": product.get("price"), "average_rating": product.get("average_rating"), "rating_number": product.get("rating_number"), # --- categorical (filterable) --- "main_category": product.get("main_category", ""), "title": product.get("title", ""), "image": extract_image(product), "categories": product.get("categories") or [], # --- free-form (display only; coerce to str for FAISS compatibility) --- "details": str(product.get("details") or ""), "total_reviews": tot } return Document(page_content=page_content, metadata=metadata) # --------------------------------------------------------------------------- # Vector store # --------------------------------------------------------------------------- # Case when we want to create embeddings at once def build_vector_store( docs: list[Document], existing_store: FAISS | None = None, ) -> FAISS: """ Embed *docs* and return (or update) a FAISS vector store. If ``existing_store`` is provided, documents are added to it. Otherwise, a new FAISS store is created. Document IDs are set to ``parent_asin``. """ if not docs: raise ValueError("Cannot build a vector store from an empty document list.") logger.info("Embedding on %s", DEVICE) # --- Create new store if needed --- if existing_store is None: dim = len(get_embeddings().embed_query("probe")) index = faiss.IndexFlatL2(dim) vector_store = FAISS( embedding_function=get_embeddings(), index=index, docstore=InMemoryDocstore(), index_to_docstore_id={}, ) else: vector_store = existing_store # --- Add documents --- uuids = [doc.metadata["parent_asin"] for doc in docs] vector_store.add_documents(documents=docs, ids=uuids) logger.info("Indexed %d documents into FAISS.", len(docs)) return vector_store # Running the above function in batches and saving def build_and_save_vector_store( metadata_dataset: Dataset, reviews: Dataset, save_path: str, batch_size: int = 500, ) -> FAISS: """ Build a FAISS vector store from a metadata Dataset, processing in batches and saving progress. This function processes the metadata dataset in batches, creating Documents and embedding them into a FAISS vector store. """ # --- Resume / initialize --- if os.path.exists(os.path.join(save_path, "index.faiss")): vector_store = FAISS.load_local( save_path, get_embeddings(), allow_dangerous_deserialization=True ) already_indexed = set(vector_store.index_to_docstore_id.values()) print(f"Resuming — {len(already_indexed)} docs already indexed.") else: os.makedirs(save_path, exist_ok=True) vector_store = None # let helper create it already_indexed = set() progress_file = os.path.join(save_path, "progress.json") # --- Resume progress --- if os.path.exists(progress_file): with open(progress_file) as f: resume_start = json.load(f).get("next_start", 0) print(f"Resuming from row {resume_start}.") else: resume_start = 0 total = len(metadata_dataset) for start in range(resume_start, total, batch_size): batch = metadata_dataset.select(range(start, min(start + batch_size, total))) docs = [] for row in batch: doc = create_document(row, reviews) if doc is not None and doc.metadata["parent_asin"] not in already_indexed: docs.append(doc) if docs: vector_store = build_vector_store( docs=docs, existing_store=vector_store, ) already_indexed.update(doc.metadata["parent_asin"] for doc in docs) # --- Save after each batch --- vector_store.save_local(save_path) with open(progress_file, "w") as f: json.dump({"next_start": min(start + batch_size, total)}, f) print(f"Indexed {min(start + batch_size, total)} / {total} rows") if os.path.exists(progress_file): os.remove(progress_file) return vector_store # --------------------------------------------------------------------------- # Search # --------------------------------------------------------------------------- def enrich_search_results(vector_store, query: str, k: int, filter=None): """ Perform similarity search and enrich results with HuggingFace dataset metadata. Args: vector_store: LangChain vector store instance query: Search query string k: Number of results to return filter: Filter dict for similarity search Returns: List of enriched metadata objects as dicts """ results = vector_store.similarity_search_with_score(query, k=k, filter=filter) enriched_results = [] for doc, score in results: metadata_object = {**doc.metadata} # start with all doc metadata metadata_object['score'] = float(score) metadata_object['reviews'] = decode_ratings(doc.page_content) or [] enriched_results.append(metadata_object) return [json.loads(json.dumps(obj, default=str)) for obj in enriched_results] # --------------------------------------------------------------------------- # Read existing vector store # --------------------------------------------------------------------------- def load_vector_store( load_path: str, ) -> FAISS: """Load a FAISS vector store from disk.""" return FAISS.load_local( load_path, embeddings=get_embeddings(), allow_dangerous_deserialization=True, )