from datasets import Dataset import duckdb def dataset_overview(dataset_dict) -> None: """Print a concise overview of a DatasetDict: splits, features, row counts.""" print(f"\n{'='*60}") print(f" Overview") print(f"{'='*60}") for split, ds in dataset_dict.items(): print(f"\n Split : {split!r} ({ds.num_rows:,} rows)") print(f" {'Field':<30} {'dtype'}") print(f" {'-'*45}") for feat, ftype in ds.features.items(): print(f" {feat:<30} {ftype}") print() def get_reviews_by_asin( reviews_dataset, parent_asin: str, ): """ Retrieve all reviews matching a given parent_asin. Parameters ---------- reviews_dataset : DatasetDict (the full reviews DatasetDict) parent_asin : the product ASIN to filter by split : which split to search in (default: "full") Returns ------- HuggingFace Dataset containing only rows matching the given parent_asin """ if not parent_asin or not isinstance(parent_asin,str): raise TypeError("Invalid parent_asin passed") ds = reviews_dataset["full"] arrow_table = ds.data.table matched_arrow = duckdb.query( f"SELECT * FROM arrow_table WHERE parent_asin = '{parent_asin}'" ).fetch_arrow_table() return Dataset(matched_arrow) def get_best_reviews( reviews_dataset, parent_asin: str, top_k: int = None, ): """ Retrieve reviews matching a given parent_asin, optionally returning only the top-k highest quality reviews. Ranking score (all components normalized to [0, 1]): - helpful_vote : 50% weight (log-scaled to reduce outlier dominance) - verified_purchase : 30% weight (bool → 1.0 or 0.0) - rating : 20% weight (how extreme the rating is — 1 or 5 are more informative than a neutral 3) Parameters ---------- reviews_dataset : DatasetDict parent_asin : product ASIN to filter by top_k : number of top reviews to return (None = return all, sorted) split : which split to use Returns ------- HuggingFace Dataset """ import math matched = get_reviews_by_asin(reviews_dataset,parent_asin) tot=matched.num_rows if tot == 0: return 0, matched if top_k is None: return 0, matched # Step 2: compute scores helpful_votes = matched["helpful_vote"] verified = matched["verified_purchase"] ratings = matched["rating"] # Log-scale helpful votes: log(1 + x), then normalize to [0, 1] log_votes = [math.log1p(v if v is not None else 0) for v in helpful_votes] max_log = max(log_votes) if max(log_votes) > 0 else 1.0 norm_votes = [v / max_log for v in log_votes] # Verified purchase: 1.0 if True, 0.0 otherwise norm_verified = [1.0 if v else 0.0 for v in verified] # Rating extremity: reviews at 1 or 5 are more informative than 3 # score = 1 - |rating - 3| / 2 → inverted so extreme ratings score higher norm_rating = [abs((r if r is not None else 3.0) - 3.0) / 2.0 for r in ratings] # Weighted sum scores = [ 0.50 * nv + 0.30 * ver + 0.20 * nr for nv, ver, nr in zip(norm_votes, norm_verified, norm_rating) ] # Step 3: select top-k indices by score k = min(top_k, matched.num_rows) top_indices = sorted(range(len(scores)), key=lambda i: scores[i], reverse=True)[:k] top_indices_sorted = sorted(top_indices) # preserve original row order return tot, matched.select(top_indices_sorted)