amazon_retriever / src /semantic.py
github-actions[bot]
chore: sync app/ and src/ from GitHub
251d75e
"""
semantic_search.py
------------------
Semantic search over an Amazon product catalogue using FAISS + HuggingFace embeddings.
Expected inputs
---------------
- metadata_dataset : datasets.Dataset — one row per product (raw_metadata["full"])
- reviews_dataset : datasets.Dataset — passed to get_best_reviews(reviews, asin, k)
Typical usage
-------------
docs = build_documents(raw_metadata["full"], raw_reviews, n=100)
store = build_vector_store(docs)
results = semantic_search("noise cancelling headphones", store, k=5)
"""
import logging
from typing import Any
import streamlit as st
import torch
import json, os, sys
from pathlib import Path
import faiss
from datasets import Dataset
from langchain_community.docstore.in_memory import InMemoryDocstore
from langchain_community.vectorstores import FAISS
from langchain_core.documents import Document
from langchain_huggingface import HuggingFaceEmbeddings
ROOT_FOLDER = Path(__file__).resolve().parent.parent
sys.path.append(str(ROOT_FOLDER))
from src.eda_helpers import get_best_reviews
from src.utils import decode_ratings, extract_image
logger = logging.getLogger(__name__)
# ---------------------------------------------------------------------------
# Constants
# ---------------------------------------------------------------------------
DEFAULT_EMBEDDING_MODEL = "sentence-transformers/all-MiniLM-L6-v2"
DEFAULT_TOP_REVIEWS = 5
DEFAULT_TOP_K = 5
DEVICE = "cuda" if torch.cuda.is_available() else "cpu"
@st.cache_resource(show_spinner=False)
def get_embeddings():
"""Initializes and returns a HuggingFaceEmbeddings instance with the specified model and device settings."""
return HuggingFaceEmbeddings(
model_name=DEFAULT_EMBEDDING_MODEL,
model_kwargs={
"device": DEVICE,
"model_kwargs": {"torch_dtype": torch.float16},
},
encode_kwargs={
"batch_size": 128 if DEVICE == "cpu" else 512,
"normalize_embeddings": True,
},
)
# ---------------------------------------------------------------------------
# Document construction
# ---------------------------------------------------------------------------
def _format_review(review) -> str:
"""Return a concise single-line string for one review."""
rating = review.get("rating", "?")
title = (review.get("title") or "").strip()
text = (review.get("text") or "").strip()
return f"[{rating}★] {title}{text}"
def _build_reviews_block(
reviews: Dataset,
parent_asin: str,
k: int = DEFAULT_TOP_REVIEWS,
) -> str:
"""
Fetch top-k reviews for *parent_asin* and return a formatted text block.
Returns an empty string when no reviews are found.
"""
total, product_reviews = get_best_reviews(reviews, parent_asin, k)
if not product_reviews:
return 0, ""
lines = "\n ".join(_format_review(r) for r in product_reviews)
return total, f"{lines}"
def _build_page_content(product, review_block: str) -> str:
"""Assemble the text that will be embedded. Empty sections are omitted."""
title = (product.get("title") or "").strip()
main_category = (product.get("main_category") or "").strip()
categories = main_category +" >> " + " > ".join(product.get("categories") or [])
features = "\n ".join(product.get("features") or [])
description = " ".join(product.get("description") or [])
details = (product.get("details") or "").strip()
parts = [f"Product: {title}"]
if categories:
parts.append(f"Category Path: {categories}")
if features:
parts.append(f"Features:\n {features}")
if description:
parts.append(f"Description:\n {description}")
if review_block:
parts.append(f"Top Reviews:\n {review_block}")
if details:
parts.append(f"Details:\n {details}")
return "\n".join(parts)
def create_document(product, reviews: Dataset) -> Document | None:
"""
Build a :class:`~langchain_core.documents.Document` from one product row.
Args:
product: A single row from a HuggingFace metadata Dataset (dict-like).
reviews: The full reviews Dataset, forwarded to ``get_best_reviews``.
Returns:
A Document, or ``None`` if the row has no ``parent_asin``.
Notes:
*page_content* contains only the text that influences embeddings.
*metadata* stores structured scalars used for filtering and display
after retrieval — values are kept flat and JSON-serialisable so FAISS
filter expressions work correctly.
"""
parent_asin = product.get("parent_asin")
if not parent_asin:
logger.warning("Skipping product with missing parent_asin: %s", product.get("title"))
return None
tot, review_block = _build_reviews_block(reviews, parent_asin)
page_content = _build_page_content(product, review_block)
metadata = {
# --- identifiers ---
"parent_asin": parent_asin,
# --- numeric (filterable / rankable) ---
"price": product.get("price"),
"average_rating": product.get("average_rating"),
"rating_number": product.get("rating_number"),
# --- categorical (filterable) ---
"main_category": product.get("main_category", ""),
"title": product.get("title", ""),
"image": extract_image(product),
"categories": product.get("categories") or [],
# --- free-form (display only; coerce to str for FAISS compatibility) ---
"details": str(product.get("details") or ""),
"total_reviews": tot
}
return Document(page_content=page_content, metadata=metadata)
# ---------------------------------------------------------------------------
# Vector store
# ---------------------------------------------------------------------------
# Case when we want to create embeddings at once
def build_vector_store(
docs: list[Document],
existing_store: FAISS | None = None,
) -> FAISS:
"""
Embed *docs* and return (or update) a FAISS vector store.
If ``existing_store`` is provided, documents are added to it.
Otherwise, a new FAISS store is created.
Document IDs are set to ``parent_asin``.
"""
if not docs:
raise ValueError("Cannot build a vector store from an empty document list.")
logger.info("Embedding on %s", DEVICE)
# --- Create new store if needed ---
if existing_store is None:
dim = len(get_embeddings().embed_query("probe"))
index = faiss.IndexFlatL2(dim)
vector_store = FAISS(
embedding_function=get_embeddings(),
index=index,
docstore=InMemoryDocstore(),
index_to_docstore_id={},
)
else:
vector_store = existing_store
# --- Add documents ---
uuids = [doc.metadata["parent_asin"] for doc in docs]
vector_store.add_documents(documents=docs, ids=uuids)
logger.info("Indexed %d documents into FAISS.", len(docs))
return vector_store
# Running the above function in batches and saving
def build_and_save_vector_store(
metadata_dataset: Dataset,
reviews: Dataset,
save_path: str,
batch_size: int = 500,
) -> FAISS:
"""
Build a FAISS vector store from a metadata Dataset, processing in batches and saving progress.
This function processes the metadata dataset in batches, creating Documents and embedding them into a FAISS vector store.
"""
# --- Resume / initialize ---
if os.path.exists(os.path.join(save_path, "index.faiss")):
vector_store = FAISS.load_local(
save_path, get_embeddings(), allow_dangerous_deserialization=True
)
already_indexed = set(vector_store.index_to_docstore_id.values())
print(f"Resuming — {len(already_indexed)} docs already indexed.")
else:
os.makedirs(save_path, exist_ok=True)
vector_store = None # let helper create it
already_indexed = set()
progress_file = os.path.join(save_path, "progress.json")
# --- Resume progress ---
if os.path.exists(progress_file):
with open(progress_file) as f:
resume_start = json.load(f).get("next_start", 0)
print(f"Resuming from row {resume_start}.")
else:
resume_start = 0
total = len(metadata_dataset)
for start in range(resume_start, total, batch_size):
batch = metadata_dataset.select(range(start, min(start + batch_size, total)))
docs = []
for row in batch:
doc = create_document(row, reviews)
if doc is not None and doc.metadata["parent_asin"] not in already_indexed:
docs.append(doc)
if docs:
vector_store = build_vector_store(
docs=docs,
existing_store=vector_store,
)
already_indexed.update(doc.metadata["parent_asin"] for doc in docs)
# --- Save after each batch ---
vector_store.save_local(save_path)
with open(progress_file, "w") as f:
json.dump({"next_start": min(start + batch_size, total)}, f)
print(f"Indexed {min(start + batch_size, total)} / {total} rows")
if os.path.exists(progress_file):
os.remove(progress_file)
return vector_store
# ---------------------------------------------------------------------------
# Search
# ---------------------------------------------------------------------------
def enrich_search_results(vector_store, query: str, k: int, filter=None):
"""
Perform similarity search and enrich results with HuggingFace dataset metadata.
Args:
vector_store: LangChain vector store instance
query: Search query string
k: Number of results to return
filter: Filter dict for similarity search
Returns:
List of enriched metadata objects as dicts
"""
results = vector_store.similarity_search_with_score(query, k=k, filter=filter)
enriched_results = []
for doc, score in results:
metadata_object = {**doc.metadata} # start with all doc metadata
metadata_object['score'] = float(score)
metadata_object['reviews'] = decode_ratings(doc.page_content) or []
enriched_results.append(metadata_object)
return [json.loads(json.dumps(obj, default=str)) for obj in enriched_results]
# ---------------------------------------------------------------------------
# Read existing vector store
# ---------------------------------------------------------------------------
def load_vector_store(
load_path: str,
) -> FAISS:
"""Load a FAISS vector store from disk."""
return FAISS.load_local(
load_path,
embeddings=get_embeddings(),
allow_dangerous_deserialization=True,
)