amazon_retriever / src /eda_helpers.py
Sarisha Das
rename all folders, fix docker path
dfdc8a2
from datasets import Dataset
import duckdb
def dataset_overview(dataset_dict) -> None:
"""Print a concise overview of a DatasetDict: splits, features, row counts."""
print(f"\n{'='*60}")
print(f" Overview")
print(f"{'='*60}")
for split, ds in dataset_dict.items():
print(f"\n Split : {split!r} ({ds.num_rows:,} rows)")
print(f" {'Field':<30} {'dtype'}")
print(f" {'-'*45}")
for feat, ftype in ds.features.items():
print(f" {feat:<30} {ftype}")
print()
def get_reviews_by_asin(
reviews_dataset,
parent_asin: str,
):
"""
Retrieve all reviews matching a given parent_asin.
Parameters
----------
reviews_dataset : DatasetDict (the full reviews DatasetDict)
parent_asin : the product ASIN to filter by
split : which split to search in (default: "full")
Returns
-------
HuggingFace Dataset containing only rows matching the given parent_asin
"""
if not parent_asin or not isinstance(parent_asin,str):
raise TypeError("Invalid parent_asin passed")
ds = reviews_dataset["full"]
arrow_table = ds.data.table
matched_arrow = duckdb.query(
f"SELECT * FROM arrow_table WHERE parent_asin = '{parent_asin}'"
).fetch_arrow_table()
return Dataset(matched_arrow)
def get_best_reviews(
reviews_dataset,
parent_asin: str,
top_k: int = None,
):
"""
Retrieve reviews matching a given parent_asin, optionally returning
only the top-k highest quality reviews.
Ranking score (all components normalized to [0, 1]):
- helpful_vote : 50% weight (log-scaled to reduce outlier dominance)
- verified_purchase : 30% weight (bool β†’ 1.0 or 0.0)
- rating : 20% weight (how extreme the rating is β€” 1 or 5
are more informative than a neutral 3)
Parameters
----------
reviews_dataset : DatasetDict
parent_asin : product ASIN to filter by
top_k : number of top reviews to return (None = return all, sorted)
split : which split to use
Returns
-------
HuggingFace Dataset
"""
import math
matched = get_reviews_by_asin(reviews_dataset,parent_asin)
tot=matched.num_rows
if tot == 0:
return 0, matched
if top_k is None:
return 0, matched
# Step 2: compute scores
helpful_votes = matched["helpful_vote"]
verified = matched["verified_purchase"]
ratings = matched["rating"]
# Log-scale helpful votes: log(1 + x), then normalize to [0, 1]
log_votes = [math.log1p(v if v is not None else 0) for v in helpful_votes]
max_log = max(log_votes) if max(log_votes) > 0 else 1.0
norm_votes = [v / max_log for v in log_votes]
# Verified purchase: 1.0 if True, 0.0 otherwise
norm_verified = [1.0 if v else 0.0 for v in verified]
# Rating extremity: reviews at 1 or 5 are more informative than 3
# score = 1 - |rating - 3| / 2 β†’ inverted so extreme ratings score higher
norm_rating = [abs((r if r is not None else 3.0) - 3.0) / 2.0 for r in ratings]
# Weighted sum
scores = [
0.50 * nv + 0.30 * ver + 0.20 * nr
for nv, ver, nr in zip(norm_votes, norm_verified, norm_rating)
]
# Step 3: select top-k indices by score
k = min(top_k, matched.num_rows)
top_indices = sorted(range(len(scores)), key=lambda i: scores[i], reverse=True)[:k]
top_indices_sorted = sorted(top_indices) # preserve original row order
return tot, matched.select(top_indices_sorted)