Spaces:
Sleeping
Sleeping
| """ | |
| src/bm25.py β BM25 keyword retrieval | |
| Uses LangChain's BM25Retriever with the custom tokenizer from utils.py. | |
| Document schema (one LangChain Document per product): | |
| page_content : text BM25 scores against = | |
| title + features + description + categories + | |
| details (flattened) + store + top-k review titles & texts | |
| metadata : structured fields for display in app.py | |
| (parent_asin, title, main_category, price, store, | |
| categories, features, description, details, top_reviews) | |
| Data source expected: HuggingFace Dataset objects as loaded in | |
| milestone1_exploration.ipynb via load_dataset("McAuley-Lab/Amazon-Reviews-2023", ...) | |
| OR the saved .jsonl subsets in data/raw/. | |
| """ | |
| import json | |
| import pickle | |
| from pathlib import Path | |
| from typing import Any | |
| import sys | |
| from datasets import Dataset | |
| from langchain_community.retrievers import BM25Retriever | |
| from langchain_core.documents import Document | |
| ROOT_FOLDER = Path(__file__).resolve().parent.parent | |
| sys.path.append(str(ROOT_FOLDER)) | |
| from src.utils import simple_tokenize, extract_image | |
| from src.eda_helpers import get_best_reviews | |
| # ββ field helpers βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| def _coerce_str(value: Any) -> str: | |
| """Safely flatten any metadata field to a plain string.""" | |
| if value is None: | |
| return "" | |
| if isinstance(value, list): | |
| return " ".join(_coerce_str(v) for v in value) | |
| if isinstance(value, dict): | |
| return " ".join(f"{k} {_coerce_str(v)}" for k, v in value.items()) | |
| s = str(value) | |
| # treat the literal string "None" as empty | |
| return "" if s.strip().lower() == "none" else s | |
| def _parse_details(details: Any) -> dict: | |
| """ | |
| 'details' in this dataset is stored as a JSON string, e.g.: | |
| '{"Brand": "Luzianne", "Item Form": "Ground", ...}' | |
| Parse it safely; return an empty dict on failure. | |
| """ | |
| if not details: | |
| return {} | |
| if isinstance(details, dict): | |
| return details | |
| try: | |
| return json.loads(str(details)) | |
| except (json.JSONDecodeError, TypeError): | |
| return {} | |
| def _parse_price(price: Any) -> float | None: | |
| """price can be a float, an int, or the string 'None'.""" | |
| if price is None: | |
| return None | |
| try: | |
| v = float(price) | |
| return None if v != v else v # NaN guard | |
| except (ValueError, TypeError): | |
| return None | |
| # ββ review selection ββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| def get_top_reviews( | |
| reviews_dataset_dict, | |
| parent_asin: str, | |
| k: int = 5, | |
| ) -> list[dict]: | |
| """ | |
| Select the top-k reviews for a product using get_best_reviews() from | |
| eda_helpers.py (weighted score: helpful_vote 50%, verified_purchase 30%, | |
| rating extremity 20%). | |
| Parameters | |
| ---------- | |
| reviews_dataset_dict : the full reviews DatasetDict (raw_reviews) β | |
| NOT the pre-selected 'full' split, because | |
| get_best_reviews() selects 'full' internally. | |
| parent_asin : product identifier | |
| k : number of reviews to return | |
| Returns | |
| ------- | |
| List of dicts with keys: title, text, rating, helpful_vote | |
| """ | |
| result = get_best_reviews(reviews_dataset_dict, parent_asin, top_k=k) | |
| # get_best_reviews returns (total_count, Dataset) when top_k is set, | |
| # or a bare Dataset with 0 rows when no reviews are found. | |
| if isinstance(result, tuple): | |
| _, matched = result | |
| else: | |
| matched = result | |
| if len(matched) == 0: | |
| return [] | |
| return [ | |
| { | |
| "title": row.get("title", "") or "", | |
| "text": row.get("text", "") or "", | |
| "rating": row.get("rating"), | |
| "helpful_vote": row.get("helpful_vote", 0), | |
| } | |
| for row in matched | |
| ] | |
| # ββ document construction βββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| def format_review(review: dict) -> str: | |
| """Format a single review the same way as in the notebook.""" | |
| return ( | |
| f"Review (Rating: {review['rating']}): " | |
| f"{review['title']}. " | |
| f"{review['text']}\n " | |
| ) | |
| def build_page_content(product: dict, top_reviews: list[dict]) -> str: | |
| """ | |
| Build the page_content string that BM25 will index. | |
| Mirrors the create_document() structure in milestone1_exploration.ipynb. | |
| """ | |
| title = _coerce_str(product.get("title")) | |
| description = " ".join(product.get("description") or []) | |
| features = "\n".join(product.get("features") or []) | |
| categories = " > ".join(product.get("categories") or []) | |
| store = _coerce_str(product.get("store")) | |
| details = _parse_details(product.get("details")) | |
| details_str = " ".join(f"{k}: {v}" for k, v in details.items()) | |
| review_lines = "".join(format_review(r) for r in top_reviews) | |
| n_reviews = len(top_reviews) | |
| return f"""Product: {title} | |
| Category: {categories} | |
| Store: {store} | |
| Features: | |
| {features} | |
| Description: | |
| {description} | |
| Details: | |
| {details_str} | |
| Top Reviews (showing {n_reviews}): | |
| {review_lines}""" | |
| def build_document(product: dict, top_reviews: list[dict]) -> Document | None: | |
| """ | |
| Build one LangChain Document for a single product row from the metadata Dataset. | |
| Returns None if there is no indexable text. | |
| """ | |
| page_content = build_page_content(product, top_reviews) | |
| if not page_content.strip(): | |
| return None | |
| details_dict = _parse_details(product.get("details")) | |
| metadata = { | |
| "parent_asin": product.get("parent_asin", ""), | |
| "title": _coerce_str(product.get("title")), | |
| "main_category": _coerce_str(product.get("main_category")), | |
| "price": _parse_price(product.get("price")), | |
| "store": _coerce_str(product.get("store")), | |
| "categories": _coerce_str(product.get("categories")), | |
| "features": _coerce_str(product.get("features")), | |
| "description": _coerce_str(product.get("description")), | |
| "details": details_dict, | |
| "average_rating": product.get("average_rating"), | |
| "rating_number": product.get("rating_number"), | |
| "image": extract_image(product), | |
| "top_reviews": top_reviews, | |
| } | |
| return Document(page_content=page_content, metadata=metadata) | |
| def pregroup_reviews( | |
| reviews_dataset_dict, | |
| max_reviews_per_product: int = 5, | |
| ) -> dict: | |
| """ | |
| Pre-group top-k reviews per product using DuckDB for efficient scoring | |
| and ranking β never loads all 14M reviews into Python memory at once. | |
| Uses a single SQL query with ROW_NUMBER() to rank reviews per product | |
| by the same weighted score as eda_helpers.get_best_reviews(): | |
| helpful_vote 50% (log-scaled) + verified_purchase 30% + rating extremity 20% | |
| """ | |
| import duckdb | |
| print("Pre-grouping reviews via DuckDB (memory-efficient) ...") | |
| arrow_table = reviews_dataset_dict["full"].data.table | |
| k = max_reviews_per_product | |
| query = f""" | |
| WITH scored AS ( | |
| SELECT | |
| parent_asin, | |
| title, | |
| text, | |
| rating, | |
| helpful_vote, | |
| verified_purchase, | |
| ( | |
| 0.5 * (LN(1 + GREATEST(COALESCE(helpful_vote, 0), 0))) | |
| + 0.3 * (CASE WHEN verified_purchase THEN 1.0 ELSE 0.0 END) | |
| + 0.2 * (ABS(COALESCE(rating, 3.0) - 3.0) / 2.0) | |
| ) AS score | |
| FROM arrow_table | |
| WHERE parent_asin IS NOT NULL AND parent_asin != '' | |
| ), | |
| ranked AS ( | |
| SELECT *, | |
| ROW_NUMBER() OVER ( | |
| PARTITION BY parent_asin | |
| ORDER BY score DESC | |
| ) AS rn | |
| FROM scored | |
| ) | |
| SELECT parent_asin, title, text, rating, helpful_vote | |
| FROM ranked | |
| WHERE rn <= {k} | |
| ORDER BY parent_asin, rn | |
| """ | |
| rows = duckdb.query(query).fetchall() | |
| cols = ["parent_asin", "title", "text", "rating", "helpful_vote"] | |
| result = {} | |
| for row in rows: | |
| r = dict(zip(cols, row)) | |
| asin = r.pop("parent_asin") | |
| result.setdefault(asin, []).append(r) | |
| print(f" {len(result):,} unique parent_asins grouped") | |
| print(" pre-grouping done") | |
| return result | |
| def build_documents( | |
| metadata_dataset: Dataset, | |
| reviews_dataset_dict, | |
| max_products: int | None = None, | |
| max_reviews_per_product: int = 5, | |
| reviews_lookup: dict | None = None, | |
| ) -> list[Document]: | |
| """ | |
| Build one LangChain Document per product. | |
| Pass reviews_lookup (from pregroup_reviews) to skip per-product DuckDB | |
| queries entirely β much faster for large datasets. | |
| """ | |
| total = len(metadata_dataset) | |
| n = min(total, max_products) if max_products else total | |
| print(f"Building documents for {n} products ...") | |
| docs = [] | |
| for i in range(n): | |
| product = metadata_dataset[i] | |
| parent_asin = product.get("parent_asin", "") | |
| if reviews_lookup is not None: | |
| top_reviews = reviews_lookup.get(parent_asin, [])[:max_reviews_per_product] | |
| else: | |
| top_reviews = get_top_reviews( | |
| reviews_dataset_dict, parent_asin, k=max_reviews_per_product | |
| ) | |
| doc = build_document(product, top_reviews) | |
| if doc: | |
| docs.append(doc) | |
| if (i + 1) % 500 == 0: | |
| print(f" ... {i + 1}/{n} products processed") | |
| print(f" -> {len(docs)} documents built (skipped {n - len(docs)} empty)") | |
| return docs | |
| # ββ index build & persist βββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| def build_and_save( | |
| documents: list[Document], | |
| index_path: str | Path = "data/processed/bm25_index.pkl", | |
| corpus_path: str | Path = "data/processed/bm25_corpus.pkl", | |
| ) -> BM25Retriever: | |
| """ | |
| Build a BM25Retriever from documents, then pickle both the | |
| tokenized corpus and the retriever to disk. | |
| Parameters | |
| ---------- | |
| documents : output of build_documents() | |
| index_path : e.g. 'data/processed/bm25_index.pkl' | |
| corpus_path : e.g. 'data/processed/bm25_corpus.pkl' | |
| Returns | |
| ------- | |
| The fitted BM25Retriever instance. | |
| """ | |
| index_path = Path(index_path) | |
| corpus_path = Path(corpus_path) | |
| index_path.parent.mkdir(parents=True, exist_ok=True) | |
| print(f"Fitting BM25 index over {len(documents)} documents β¦") | |
| retriever = BM25Retriever.from_documents( | |
| documents, | |
| preprocess_func=simple_tokenize, | |
| ) | |
| # Save tokenized corpus separately β useful for inspection in the notebook | |
| tokenized_corpus = [simple_tokenize(doc.page_content) for doc in documents] | |
| with open(corpus_path, "wb") as f: | |
| pickle.dump(tokenized_corpus, f) | |
| print(f"Tokenized corpus saved β {corpus_path}") | |
| with open(index_path, "wb") as f: | |
| pickle.dump(retriever, f) | |
| print(f"BM25 index saved β {index_path}") | |
| return retriever | |
| # ββ load ββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| def load(index_path: str | Path = "data/processed/bm25_index.pkl") -> BM25Retriever: | |
| """ | |
| Load a previously saved BM25Retriever from disk. | |
| Call this in app.py instead of rebuilding every time. | |
| """ | |
| index_path = Path(index_path) | |
| if not index_path.exists(): | |
| raise FileNotFoundError( | |
| f"BM25 index not found at '{index_path}'.\n" | |
| "Run build_and_save() from your notebook first." | |
| ) | |
| with open(index_path, "rb") as f: | |
| retriever = pickle.load(f) | |
| print(f"BM25 index loaded β {index_path}") | |
| return retriever | |
| # ββ search ββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| def search( | |
| retriever: BM25Retriever, | |
| query: str, | |
| top_k: int = 3, | |
| ) -> list[dict]: | |
| """ | |
| Search the BM25Retriever for a query, returning metadata of top-k results. | |
| Performs a BM25 keyword search on the indexed documents. Tokenizes the query | |
| using the same tokenizer as the index, computes BM25 scores for all documents, | |
| and returns structured metadata (including score) for the top-k matches. | |
| """ | |
| retriever.k = top_k | |
| # Tokenize query the same way the index was built | |
| tokenized_query = simple_tokenize(query) | |
| # Get raw BM25 scores for ALL documents | |
| scores = retriever.vectorizer.get_scores(tokenized_query) # np.ndarray, len = n_docs | |
| # Get indices of top-k scores | |
| top_indices = sorted( | |
| range(len(scores)), | |
| key=scores.__getitem__, | |
| reverse=True | |
| )[:top_k] | |
| # Collect metadata for top results, including score | |
| return [ | |
| {**retriever.docs[i].metadata, "score": scores[i]} | |
| for i in top_indices | |
| ] | |
| # ββ notebook entry point ββββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| def build_from_hf_datasets( | |
| metadata_dataset: Dataset, | |
| reviews_dataset_dict, | |
| index_path: str | Path = "data/processed/tokenisation/bm25_index.pkl", | |
| corpus_path: str | Path = "data/processed/tokenisation/bm25_corpus.pkl", | |
| max_products: int | None = None, | |
| max_reviews_per_product: int = 5, | |
| ) -> BM25Retriever: | |
| """ | |
| End-to-end helper to call from milestone1_exploration.ipynb. | |
| Example usage in the notebook: | |
| -------------------------------- | |
| from src.bm25 import build_from_hf_datasets, load, search | |
| retriever = build_from_hf_datasets( | |
| metadata_dataset=raw_metadata['full'], | |
| reviews_dataset_dict=raw_reviews, | |
| max_products=500, | |
| ) | |
| # Later in app.py β just load the saved index: | |
| # retriever = load("data/processed/bm25_index.pkl") | |
| # results = search(retriever, "something sweet for a cheese board") | |
| """ | |
| reviews_lookup = pregroup_reviews(reviews_dataset_dict, max_reviews_per_product) | |
| docs = build_documents( | |
| metadata_dataset, | |
| reviews_dataset_dict, | |
| max_products=max_products, | |
| max_reviews_per_product=max_reviews_per_product, | |
| reviews_lookup=reviews_lookup, | |
| ) | |
| return build_and_save(docs, index_path=index_path, corpus_path=corpus_path) | |
| def build_from_hf_datasets_batched( | |
| metadata_dataset: Dataset, | |
| reviews_dataset_dict, | |
| index_path: str | Path = "data/processed/tokenisation/bm25_index.pkl", | |
| corpus_path: str | Path = "data/processed/tokenisation/bm25_corpus.pkl", | |
| batch_size: int = 2000, | |
| max_reviews_per_product: int = 5, | |
| max_products: int | None = None, | |
| ) -> BM25Retriever: | |
| """ | |
| Memory-safe version of build_from_hf_datasets β builds documents in | |
| batches to avoid OOM kernel crashes on large datasets. | |
| Checkpoints completed batches to data/processed/checkpoints/ after each | |
| batch, so if the kernel dies mid-run you can resume from the last | |
| completed batch instead of starting over. | |
| Example usage in the notebook: | |
| -------------------------------- | |
| retriever = build_from_hf_datasets_batched( | |
| metadata_dataset=raw_metadata['full'], | |
| reviews_dataset_dict=raw_reviews, | |
| batch_size=5000, | |
| max_reviews_per_product=3, | |
| max_products=60000, # None = use all | |
| ) | |
| """ | |
| index_path = Path(index_path) | |
| corpus_path = Path(corpus_path) | |
| # checkpoint folder lives next to the index | |
| checkpoint_dir = index_path.parent / "checkpoints" | |
| checkpoint_dir.mkdir(parents=True, exist_ok=True) | |
| total = min(len(metadata_dataset), max_products) if max_products else len(metadata_dataset) | |
| # find resume point β checkpoints named docs_0.pkl, docs_2000.pkl, ... | |
| existing = sorted(checkpoint_dir.glob("docs_*.pkl")) | |
| if existing: | |
| last_ckpt = existing[-1] | |
| resume_start = int(last_ckpt.stem.split("_")[1]) + batch_size | |
| print(f"Resuming from product {resume_start} " | |
| f"({len(existing)} checkpoint(s) found)") | |
| all_docs = [] | |
| for ckpt in existing: | |
| with open(ckpt, "rb") as f: | |
| all_docs.extend(pickle.load(f)) | |
| print(f" loaded {len(all_docs)} docs from checkpoints") | |
| else: | |
| resume_start = 0 | |
| all_docs = [] | |
| print(f"Starting fresh β {total} products to process") | |
| # pre-group all reviews once | |
| reviews_lookup = pregroup_reviews(reviews_dataset_dict, max_reviews_per_product) | |
| # batch loop | |
| for start in range(resume_start, total, batch_size): | |
| end = min(start + batch_size, total) | |
| print(f"\nBatch {start}-{end} of {total} ...") | |
| batch = metadata_dataset.select(range(start, end)) | |
| batch_docs = build_documents( | |
| batch, | |
| reviews_dataset_dict, | |
| max_products=None, | |
| max_reviews_per_product=max_reviews_per_product, | |
| reviews_lookup=reviews_lookup, | |
| ) | |
| all_docs.extend(batch_docs) | |
| # save checkpoint for this batch | |
| ckpt_path = checkpoint_dir / f"docs_{start}.pkl" | |
| with open(ckpt_path, "wb") as f: | |
| pickle.dump(batch_docs, f) | |
| print(f" checkpoint saved -> {ckpt_path.name}") | |
| print(f" cumulative docs : {len(all_docs)}") | |
| # build final index | |
| print(f"\nAll batches done - {len(all_docs)} total documents.") | |
| retriever = build_and_save(all_docs, index_path=index_path, corpus_path=corpus_path) | |
| # clean up checkpoints now that final index is safely written | |
| for ckpt in checkpoint_dir.glob("docs_*.pkl"): | |
| ckpt.unlink() | |
| print("Checkpoints cleaned up.") | |
| return retriever | |