Spaces:
Sleeping
Sleeping
| """ | |
| semantic_search.py | |
| ------------------ | |
| Semantic search over an Amazon product catalogue using FAISS + HuggingFace embeddings. | |
| Expected inputs | |
| --------------- | |
| - metadata_dataset : datasets.Dataset — one row per product (raw_metadata["full"]) | |
| - reviews_dataset : datasets.Dataset — passed to get_best_reviews(reviews, asin, k) | |
| Typical usage | |
| ------------- | |
| docs = build_documents(raw_metadata["full"], raw_reviews, n=100) | |
| store = build_vector_store(docs) | |
| results = semantic_search("noise cancelling headphones", store, k=5) | |
| """ | |
| import logging | |
| from typing import Any | |
| import streamlit as st | |
| import torch | |
| import json, os, sys | |
| from pathlib import Path | |
| import faiss | |
| from datasets import Dataset | |
| from langchain_community.docstore.in_memory import InMemoryDocstore | |
| from langchain_community.vectorstores import FAISS | |
| from langchain_core.documents import Document | |
| from langchain_huggingface import HuggingFaceEmbeddings | |
| ROOT_FOLDER = Path(__file__).resolve().parent.parent | |
| sys.path.append(str(ROOT_FOLDER)) | |
| from src.eda_helpers import get_best_reviews | |
| from src.utils import decode_ratings, extract_image | |
| logger = logging.getLogger(__name__) | |
| # --------------------------------------------------------------------------- | |
| # Constants | |
| # --------------------------------------------------------------------------- | |
| DEFAULT_EMBEDDING_MODEL = "sentence-transformers/all-MiniLM-L6-v2" | |
| DEFAULT_TOP_REVIEWS = 5 | |
| DEFAULT_TOP_K = 5 | |
| DEVICE = "cuda" if torch.cuda.is_available() else "cpu" | |
| def get_embeddings(): | |
| """Initializes and returns a HuggingFaceEmbeddings instance with the specified model and device settings.""" | |
| return HuggingFaceEmbeddings( | |
| model_name=DEFAULT_EMBEDDING_MODEL, | |
| model_kwargs={ | |
| "device": DEVICE, | |
| "model_kwargs": {"torch_dtype": torch.float16}, | |
| }, | |
| encode_kwargs={ | |
| "batch_size": 128 if DEVICE == "cpu" else 512, | |
| "normalize_embeddings": True, | |
| }, | |
| ) | |
| # --------------------------------------------------------------------------- | |
| # Document construction | |
| # --------------------------------------------------------------------------- | |
| def _format_review(review) -> str: | |
| """Return a concise single-line string for one review.""" | |
| rating = review.get("rating", "?") | |
| title = (review.get("title") or "").strip() | |
| text = (review.get("text") or "").strip() | |
| return f"[{rating}★] {title} — {text}" | |
| def _build_reviews_block( | |
| reviews: Dataset, | |
| parent_asin: str, | |
| k: int = DEFAULT_TOP_REVIEWS, | |
| ) -> str: | |
| """ | |
| Fetch top-k reviews for *parent_asin* and return a formatted text block. | |
| Returns an empty string when no reviews are found. | |
| """ | |
| total, product_reviews = get_best_reviews(reviews, parent_asin, k) | |
| if not product_reviews: | |
| return 0, "" | |
| lines = "\n ".join(_format_review(r) for r in product_reviews) | |
| return total, f"{lines}" | |
| def _build_page_content(product, review_block: str) -> str: | |
| """Assemble the text that will be embedded. Empty sections are omitted.""" | |
| title = (product.get("title") or "").strip() | |
| main_category = (product.get("main_category") or "").strip() | |
| categories = main_category +" >> " + " > ".join(product.get("categories") or []) | |
| features = "\n ".join(product.get("features") or []) | |
| description = " ".join(product.get("description") or []) | |
| details = (product.get("details") or "").strip() | |
| parts = [f"Product: {title}"] | |
| if categories: | |
| parts.append(f"Category Path: {categories}") | |
| if features: | |
| parts.append(f"Features:\n {features}") | |
| if description: | |
| parts.append(f"Description:\n {description}") | |
| if review_block: | |
| parts.append(f"Top Reviews:\n {review_block}") | |
| if details: | |
| parts.append(f"Details:\n {details}") | |
| return "\n".join(parts) | |
| def create_document(product, reviews: Dataset) -> Document | None: | |
| """ | |
| Build a :class:`~langchain_core.documents.Document` from one product row. | |
| Args: | |
| product: A single row from a HuggingFace metadata Dataset (dict-like). | |
| reviews: The full reviews Dataset, forwarded to ``get_best_reviews``. | |
| Returns: | |
| A Document, or ``None`` if the row has no ``parent_asin``. | |
| Notes: | |
| *page_content* contains only the text that influences embeddings. | |
| *metadata* stores structured scalars used for filtering and display | |
| after retrieval — values are kept flat and JSON-serialisable so FAISS | |
| filter expressions work correctly. | |
| """ | |
| parent_asin = product.get("parent_asin") | |
| if not parent_asin: | |
| logger.warning("Skipping product with missing parent_asin: %s", product.get("title")) | |
| return None | |
| tot, review_block = _build_reviews_block(reviews, parent_asin) | |
| page_content = _build_page_content(product, review_block) | |
| metadata = { | |
| # --- identifiers --- | |
| "parent_asin": parent_asin, | |
| # --- numeric (filterable / rankable) --- | |
| "price": product.get("price"), | |
| "average_rating": product.get("average_rating"), | |
| "rating_number": product.get("rating_number"), | |
| # --- categorical (filterable) --- | |
| "main_category": product.get("main_category", ""), | |
| "title": product.get("title", ""), | |
| "image": extract_image(product), | |
| "categories": product.get("categories") or [], | |
| # --- free-form (display only; coerce to str for FAISS compatibility) --- | |
| "details": str(product.get("details") or ""), | |
| "total_reviews": tot | |
| } | |
| return Document(page_content=page_content, metadata=metadata) | |
| # --------------------------------------------------------------------------- | |
| # Vector store | |
| # --------------------------------------------------------------------------- | |
| # Case when we want to create embeddings at once | |
| def build_vector_store( | |
| docs: list[Document], | |
| existing_store: FAISS | None = None, | |
| ) -> FAISS: | |
| """ | |
| Embed *docs* and return (or update) a FAISS vector store. | |
| If ``existing_store`` is provided, documents are added to it. | |
| Otherwise, a new FAISS store is created. | |
| Document IDs are set to ``parent_asin``. | |
| """ | |
| if not docs: | |
| raise ValueError("Cannot build a vector store from an empty document list.") | |
| logger.info("Embedding on %s", DEVICE) | |
| # --- Create new store if needed --- | |
| if existing_store is None: | |
| dim = len(get_embeddings().embed_query("probe")) | |
| index = faiss.IndexFlatL2(dim) | |
| vector_store = FAISS( | |
| embedding_function=get_embeddings(), | |
| index=index, | |
| docstore=InMemoryDocstore(), | |
| index_to_docstore_id={}, | |
| ) | |
| else: | |
| vector_store = existing_store | |
| # --- Add documents --- | |
| uuids = [doc.metadata["parent_asin"] for doc in docs] | |
| vector_store.add_documents(documents=docs, ids=uuids) | |
| logger.info("Indexed %d documents into FAISS.", len(docs)) | |
| return vector_store | |
| # Running the above function in batches and saving | |
| def build_and_save_vector_store( | |
| metadata_dataset: Dataset, | |
| reviews: Dataset, | |
| save_path: str, | |
| batch_size: int = 500, | |
| ) -> FAISS: | |
| """ | |
| Build a FAISS vector store from a metadata Dataset, processing in batches and saving progress. | |
| This function processes the metadata dataset in batches, creating Documents and embedding them into a FAISS vector store. | |
| """ | |
| # --- Resume / initialize --- | |
| if os.path.exists(os.path.join(save_path, "index.faiss")): | |
| vector_store = FAISS.load_local( | |
| save_path, get_embeddings(), allow_dangerous_deserialization=True | |
| ) | |
| already_indexed = set(vector_store.index_to_docstore_id.values()) | |
| print(f"Resuming — {len(already_indexed)} docs already indexed.") | |
| else: | |
| os.makedirs(save_path, exist_ok=True) | |
| vector_store = None # let helper create it | |
| already_indexed = set() | |
| progress_file = os.path.join(save_path, "progress.json") | |
| # --- Resume progress --- | |
| if os.path.exists(progress_file): | |
| with open(progress_file) as f: | |
| resume_start = json.load(f).get("next_start", 0) | |
| print(f"Resuming from row {resume_start}.") | |
| else: | |
| resume_start = 0 | |
| total = len(metadata_dataset) | |
| for start in range(resume_start, total, batch_size): | |
| batch = metadata_dataset.select(range(start, min(start + batch_size, total))) | |
| docs = [] | |
| for row in batch: | |
| doc = create_document(row, reviews) | |
| if doc is not None and doc.metadata["parent_asin"] not in already_indexed: | |
| docs.append(doc) | |
| if docs: | |
| vector_store = build_vector_store( | |
| docs=docs, | |
| existing_store=vector_store, | |
| ) | |
| already_indexed.update(doc.metadata["parent_asin"] for doc in docs) | |
| # --- Save after each batch --- | |
| vector_store.save_local(save_path) | |
| with open(progress_file, "w") as f: | |
| json.dump({"next_start": min(start + batch_size, total)}, f) | |
| print(f"Indexed {min(start + batch_size, total)} / {total} rows") | |
| if os.path.exists(progress_file): | |
| os.remove(progress_file) | |
| return vector_store | |
| # --------------------------------------------------------------------------- | |
| # Search | |
| # --------------------------------------------------------------------------- | |
| def enrich_search_results(vector_store, query: str, k: int, filter=None): | |
| """ | |
| Perform similarity search and enrich results with HuggingFace dataset metadata. | |
| Args: | |
| vector_store: LangChain vector store instance | |
| query: Search query string | |
| k: Number of results to return | |
| filter: Filter dict for similarity search | |
| Returns: | |
| List of enriched metadata objects as dicts | |
| """ | |
| results = vector_store.similarity_search_with_score(query, k=k, filter=filter) | |
| enriched_results = [] | |
| for doc, score in results: | |
| metadata_object = {**doc.metadata} # start with all doc metadata | |
| metadata_object['score'] = float(score) | |
| metadata_object['reviews'] = decode_ratings(doc.page_content) or [] | |
| enriched_results.append(metadata_object) | |
| return [json.loads(json.dumps(obj, default=str)) for obj in enriched_results] | |
| # --------------------------------------------------------------------------- | |
| # Read existing vector store | |
| # --------------------------------------------------------------------------- | |
| def load_vector_store( | |
| load_path: str, | |
| ) -> FAISS: | |
| """Load a FAISS vector store from disk.""" | |
| return FAISS.load_local( | |
| load_path, | |
| embeddings=get_embeddings(), | |
| allow_dangerous_deserialization=True, | |
| ) |