Spaces:
Running
Running
Upload 10 files
Browse files- .gitattributes +6 -0
- data/prepare_data.py +451 -0
- data/processed/items.parquet +3 -0
- data/processed/reviews.parquet +3 -0
- data/processed/users.parquet +3 -0
- data/raw/meta_Books.jsonl +3 -0
- data/raw/meta_Kindle_Store.jsonl +3 -0
- data/raw/meta_Movies_and_TV.jsonl +3 -0
- data/raw/review_Books.jsonl +3 -0
- data/raw/review_Kindle_Store.jsonl +3 -0
- data/raw/review_Movies_and_TV.jsonl +3 -0
.gitattributes
CHANGED
|
@@ -33,3 +33,9 @@ saved_model/**/* filter=lfs diff=lfs merge=lfs -text
|
|
| 33 |
*.zip filter=lfs diff=lfs merge=lfs -text
|
| 34 |
*.zst filter=lfs diff=lfs merge=lfs -text
|
| 35 |
*tfevents* filter=lfs diff=lfs merge=lfs -text
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
| 33 |
*.zip filter=lfs diff=lfs merge=lfs -text
|
| 34 |
*.zst filter=lfs diff=lfs merge=lfs -text
|
| 35 |
*tfevents* filter=lfs diff=lfs merge=lfs -text
|
| 36 |
+
data/raw/meta_Books.jsonl filter=lfs diff=lfs merge=lfs -text
|
| 37 |
+
data/raw/meta_Kindle_Store.jsonl filter=lfs diff=lfs merge=lfs -text
|
| 38 |
+
data/raw/meta_Movies_and_TV.jsonl filter=lfs diff=lfs merge=lfs -text
|
| 39 |
+
data/raw/review_Books.jsonl filter=lfs diff=lfs merge=lfs -text
|
| 40 |
+
data/raw/review_Kindle_Store.jsonl filter=lfs diff=lfs merge=lfs -text
|
| 41 |
+
data/raw/review_Movies_and_TV.jsonl filter=lfs diff=lfs merge=lfs -text
|
data/prepare_data.py
ADDED
|
@@ -0,0 +1,451 @@
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
| 1 |
+
"""
|
| 2 |
+
Download and sample Amazon Reviews 2023 β Books, Movies_and_TV, Kindle_Store.
|
| 3 |
+
|
| 4 |
+
Source: McAuley-Lab/Amazon-Reviews-2023 on HuggingFace.
|
| 5 |
+
|
| 6 |
+
The dataset is stored as single, large JSONL files (one per category) under:
|
| 7 |
+
raw/review_categories/<Category>.jsonl
|
| 8 |
+
raw/meta_categories/meta_<Category>.jsonl
|
| 9 |
+
|
| 10 |
+
We stream these files over HTTP, line by line, and cache each phase to disk
|
| 11 |
+
so a network hiccup doesn't force us to re-download everything. After a
|
| 12 |
+
crash, re-run the script β completed phases are skipped automatically.
|
| 13 |
+
|
| 14 |
+
Disk cache layout (under data/raw/):
|
| 15 |
+
review_<Category>.jsonl β raw streamed reviews (per category)
|
| 16 |
+
meta_<Category>.jsonl β filtered metadata (per category)
|
| 17 |
+
|
| 18 |
+
Output: data/processed/{reviews,items,users}.parquet
|
| 19 |
+
"""
|
| 20 |
+
from __future__ import annotations
|
| 21 |
+
|
| 22 |
+
import argparse
|
| 23 |
+
import json
|
| 24 |
+
import logging
|
| 25 |
+
import os
|
| 26 |
+
import socket
|
| 27 |
+
import time
|
| 28 |
+
import urllib.error
|
| 29 |
+
import urllib.request
|
| 30 |
+
from pathlib import Path
|
| 31 |
+
|
| 32 |
+
import numpy as np
|
| 33 |
+
import pandas as pd
|
| 34 |
+
|
| 35 |
+
logging.basicConfig(level=logging.INFO, format="%(asctime)s [%(levelname)s] %(message)s")
|
| 36 |
+
log = logging.getLogger(__name__)
|
| 37 |
+
|
| 38 |
+
CATEGORIES = ["Books", "Movies_and_TV", "Kindle_Store"]
|
| 39 |
+
|
| 40 |
+
BASE_URL = "https://huggingface.co/datasets/McAuley-Lab/Amazon-Reviews-2023/resolve/main"
|
| 41 |
+
|
| 42 |
+
DATA_DIR = Path(os.environ.get("DATA_DIR", "./data"))
|
| 43 |
+
RAW_DIR = DATA_DIR / "raw"
|
| 44 |
+
PROCESSED_DIR = DATA_DIR / "processed"
|
| 45 |
+
|
| 46 |
+
REVIEW_KEEP_KEYS = ("user_id", "parent_asin", "rating", "title", "text",
|
| 47 |
+
"helpful_vote", "verified_purchase", "timestamp")
|
| 48 |
+
META_KEEP_KEYS = ("parent_asin", "title", "description", "features",
|
| 49 |
+
"categories", "average_rating", "rating_number", "price")
|
| 50 |
+
|
| 51 |
+
DEFAULT_USERS_PER_CATEGORY = 3000
|
| 52 |
+
DEFAULT_MIN_REVIEWS = 5
|
| 53 |
+
DEFAULT_MAX_REVIEWS = 50
|
| 54 |
+
DEFAULT_TEST_HOLDOUT = 2
|
| 55 |
+
|
| 56 |
+
# Network tolerance
|
| 57 |
+
NETWORK_TIMEOUT = 300 # 5 minutes per read
|
| 58 |
+
RETRY_ATTEMPTS = 4
|
| 59 |
+
RETRY_BACKOFF_BASE = 5 # seconds; doubles each retry
|
| 60 |
+
|
| 61 |
+
|
| 62 |
+
# ββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ
|
| 63 |
+
# Network primitives with retry
|
| 64 |
+
# ββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ
|
| 65 |
+
|
| 66 |
+
def _open_url(url: str, byte_offset: int = 0):
|
| 67 |
+
"""Open a URL with a long timeout and optional Range header for resumes."""
|
| 68 |
+
headers = {"User-Agent": "naijataste-ai/1.0"}
|
| 69 |
+
if byte_offset > 0:
|
| 70 |
+
headers["Range"] = f"bytes={byte_offset}-"
|
| 71 |
+
req = urllib.request.Request(url, headers=headers)
|
| 72 |
+
return urllib.request.urlopen(req, timeout=NETWORK_TIMEOUT)
|
| 73 |
+
|
| 74 |
+
|
| 75 |
+
def _is_transient(exc: BaseException) -> bool:
|
| 76 |
+
"""Network errors we should retry on."""
|
| 77 |
+
if isinstance(exc, (socket.timeout, TimeoutError)):
|
| 78 |
+
return True
|
| 79 |
+
if isinstance(exc, urllib.error.URLError):
|
| 80 |
+
# Most URLErrors wrap transient issues (DNS, conn reset)
|
| 81 |
+
return True
|
| 82 |
+
if isinstance(exc, (ConnectionResetError, ConnectionAbortedError,
|
| 83 |
+
ConnectionRefusedError)):
|
| 84 |
+
return True
|
| 85 |
+
return False
|
| 86 |
+
|
| 87 |
+
|
| 88 |
+
# ββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ
|
| 89 |
+
# Streaming with disk cache
|
| 90 |
+
# ββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ
|
| 91 |
+
|
| 92 |
+
def stream_to_cache(url: str, cache_path: Path, max_rows: int,
|
| 93 |
+
progress_every: int = 25_000) -> int:
|
| 94 |
+
"""Stream a JSONL URL to a local cache file. Returns rows written.
|
| 95 |
+
|
| 96 |
+
If cache_path already exists and contains >= max_rows lines, this is a
|
| 97 |
+
no-op. Otherwise writes line-by-line; on network failure, retries with
|
| 98 |
+
exponential backoff and resumes from where we left off.
|
| 99 |
+
"""
|
| 100 |
+
if cache_path.exists():
|
| 101 |
+
existing = sum(1 for _ in cache_path.open("r", encoding="utf-8"))
|
| 102 |
+
if existing >= max_rows:
|
| 103 |
+
log.info(f" cache hit: {cache_path.name} has {existing:,} rows β₯ target {max_rows:,}; skipping download")
|
| 104 |
+
return existing
|
| 105 |
+
log.info(f" partial cache: {cache_path.name} has {existing:,} rows; resuming")
|
| 106 |
+
rows_so_far = existing
|
| 107 |
+
mode = "a"
|
| 108 |
+
else:
|
| 109 |
+
rows_so_far = 0
|
| 110 |
+
mode = "w"
|
| 111 |
+
|
| 112 |
+
cache_path.parent.mkdir(parents=True, exist_ok=True)
|
| 113 |
+
|
| 114 |
+
for attempt in range(1, RETRY_ATTEMPTS + 1):
|
| 115 |
+
try:
|
| 116 |
+
with _open_url(url) as resp, cache_path.open(mode, encoding="utf-8") as fout:
|
| 117 |
+
# If resuming, we need to skip lines we already have.
|
| 118 |
+
# Simpler approach: server doesn't honor byte ranges reliably
|
| 119 |
+
# on HF for line semantics, so we re-stream from start and
|
| 120 |
+
# skip the first `rows_so_far` lines.
|
| 121 |
+
skipped = 0
|
| 122 |
+
for raw in resp:
|
| 123 |
+
if not raw or raw.isspace():
|
| 124 |
+
continue
|
| 125 |
+
if skipped < rows_so_far:
|
| 126 |
+
skipped += 1
|
| 127 |
+
continue
|
| 128 |
+
# Write line as-is (already valid JSONL line ending with \n)
|
| 129 |
+
text = raw.decode("utf-8", errors="replace")
|
| 130 |
+
if not text.endswith("\n"):
|
| 131 |
+
text += "\n"
|
| 132 |
+
fout.write(text)
|
| 133 |
+
rows_so_far += 1
|
| 134 |
+
if rows_so_far % progress_every == 0:
|
| 135 |
+
log.info(f" cached {rows_so_far:,} rowsβ¦")
|
| 136 |
+
if rows_so_far >= max_rows:
|
| 137 |
+
break
|
| 138 |
+
log.info(f" β cached {rows_so_far:,} rows to {cache_path.name}")
|
| 139 |
+
return rows_so_far
|
| 140 |
+
except Exception as e:
|
| 141 |
+
if not _is_transient(e) or attempt == RETRY_ATTEMPTS:
|
| 142 |
+
raise
|
| 143 |
+
backoff = RETRY_BACKOFF_BASE * (2 ** (attempt - 1))
|
| 144 |
+
log.warning(f" network error ({type(e).__name__}: {e}); retry {attempt}/{RETRY_ATTEMPTS - 1} in {backoff}s")
|
| 145 |
+
time.sleep(backoff)
|
| 146 |
+
# Recount how much we have on disk before next attempt
|
| 147 |
+
if cache_path.exists():
|
| 148 |
+
rows_so_far = sum(1 for _ in cache_path.open("r", encoding="utf-8"))
|
| 149 |
+
mode = "a"
|
| 150 |
+
else:
|
| 151 |
+
rows_so_far = 0
|
| 152 |
+
mode = "w"
|
| 153 |
+
|
| 154 |
+
raise RuntimeError("unreachable")
|
| 155 |
+
|
| 156 |
+
|
| 157 |
+
def stream_filter_to_cache(url: str, cache_path: Path, target_asins: set[str],
|
| 158 |
+
max_scan: int, progress_every: int = 100_000) -> int:
|
| 159 |
+
"""Stream metadata, keep only rows whose parent_asin is in target, cache.
|
| 160 |
+
|
| 161 |
+
Same retry+resume semantics as stream_to_cache. Returns rows written.
|
| 162 |
+
"""
|
| 163 |
+
if cache_path.exists():
|
| 164 |
+
kept_existing = sum(1 for _ in cache_path.open("r", encoding="utf-8"))
|
| 165 |
+
log.info(f" cache hit: {cache_path.name} has {kept_existing:,} rows; using as-is")
|
| 166 |
+
return kept_existing
|
| 167 |
+
|
| 168 |
+
cache_path.parent.mkdir(parents=True, exist_ok=True)
|
| 169 |
+
kept = 0
|
| 170 |
+
scanned = 0
|
| 171 |
+
found_asins: set[str] = set()
|
| 172 |
+
|
| 173 |
+
for attempt in range(1, RETRY_ATTEMPTS + 1):
|
| 174 |
+
try:
|
| 175 |
+
# Truncate cache on retry β we restart scanning from the top
|
| 176 |
+
# (deduplication happens at parquet stage via drop_duplicates)
|
| 177 |
+
with _open_url(url) as resp, cache_path.open("w", encoding="utf-8") as fout:
|
| 178 |
+
kept = 0
|
| 179 |
+
scanned = 0
|
| 180 |
+
found_asins = set()
|
| 181 |
+
for raw in resp:
|
| 182 |
+
if not raw or raw.isspace():
|
| 183 |
+
continue
|
| 184 |
+
scanned += 1
|
| 185 |
+
try:
|
| 186 |
+
row = json.loads(raw)
|
| 187 |
+
except json.JSONDecodeError:
|
| 188 |
+
continue
|
| 189 |
+
asin = row.get("parent_asin")
|
| 190 |
+
if asin in target_asins and asin not in found_asins:
|
| 191 |
+
text = raw.decode("utf-8", errors="replace") \
|
| 192 |
+
if isinstance(raw, bytes) else raw
|
| 193 |
+
if not text.endswith("\n"):
|
| 194 |
+
text += "\n"
|
| 195 |
+
fout.write(text)
|
| 196 |
+
kept += 1
|
| 197 |
+
found_asins.add(asin)
|
| 198 |
+
if kept >= len(target_asins):
|
| 199 |
+
break
|
| 200 |
+
if scanned % progress_every == 0:
|
| 201 |
+
log.info(f" scanned {scanned:,}, kept {kept:,}")
|
| 202 |
+
if scanned >= max_scan:
|
| 203 |
+
break
|
| 204 |
+
log.info(f" β scanned {scanned:,}, cached {kept:,} matching rows to {cache_path.name}")
|
| 205 |
+
return kept
|
| 206 |
+
except Exception as e:
|
| 207 |
+
if not _is_transient(e) or attempt == RETRY_ATTEMPTS:
|
| 208 |
+
raise
|
| 209 |
+
backoff = RETRY_BACKOFF_BASE * (2 ** (attempt - 1))
|
| 210 |
+
log.warning(f" network error ({type(e).__name__}: {e}); retry {attempt}/{RETRY_ATTEMPTS - 1} in {backoff}s")
|
| 211 |
+
time.sleep(backoff)
|
| 212 |
+
|
| 213 |
+
raise RuntimeError("unreachable")
|
| 214 |
+
|
| 215 |
+
|
| 216 |
+
# ββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ
|
| 217 |
+
# Cache β DataFrame loaders
|
| 218 |
+
# ββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ
|
| 219 |
+
|
| 220 |
+
def load_reviews_from_cache(cache_path: Path, category: str) -> pd.DataFrame:
|
| 221 |
+
rows = []
|
| 222 |
+
with cache_path.open("r", encoding="utf-8") as f:
|
| 223 |
+
for raw in f:
|
| 224 |
+
try:
|
| 225 |
+
r = json.loads(raw)
|
| 226 |
+
except json.JSONDecodeError:
|
| 227 |
+
continue
|
| 228 |
+
rows.append({k: r.get(k) for k in REVIEW_KEEP_KEYS})
|
| 229 |
+
df = pd.DataFrame(rows)
|
| 230 |
+
df["domain"] = category
|
| 231 |
+
return df
|
| 232 |
+
|
| 233 |
+
|
| 234 |
+
def load_meta_from_cache(cache_path: Path, category: str) -> pd.DataFrame:
|
| 235 |
+
rows = []
|
| 236 |
+
with cache_path.open("r", encoding="utf-8") as f:
|
| 237 |
+
for raw in f:
|
| 238 |
+
try:
|
| 239 |
+
r = json.loads(raw)
|
| 240 |
+
except json.JSONDecodeError:
|
| 241 |
+
continue
|
| 242 |
+
row = {}
|
| 243 |
+
for k in META_KEEP_KEYS:
|
| 244 |
+
v = r.get(k)
|
| 245 |
+
if isinstance(v, list):
|
| 246 |
+
v = " ".join(str(x) for x in v if x is not None)
|
| 247 |
+
row[k] = v
|
| 248 |
+
row["domain"] = category
|
| 249 |
+
rows.append(row)
|
| 250 |
+
df = pd.DataFrame(rows)
|
| 251 |
+
if not df.empty:
|
| 252 |
+
for col in ("description", "features"):
|
| 253 |
+
if col in df.columns:
|
| 254 |
+
df[col] = df[col].astype(str).str.slice(0, 2000)
|
| 255 |
+
return df
|
| 256 |
+
|
| 257 |
+
|
| 258 |
+
# ββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ
|
| 259 |
+
# Sampling, splits, normalization (unchanged from v3)
|
| 260 |
+
# ββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ
|
| 261 |
+
|
| 262 |
+
def sample_users(reviews: pd.DataFrame, min_reviews: int, max_reviews: int,
|
| 263 |
+
target_users: int) -> pd.DataFrame:
|
| 264 |
+
counts = reviews.groupby("user_id").agg(
|
| 265 |
+
n_reviews=("rating", "size"),
|
| 266 |
+
n_domains=("domain", "nunique"),
|
| 267 |
+
).reset_index()
|
| 268 |
+
|
| 269 |
+
eligible = counts[(counts["n_reviews"] >= min_reviews)
|
| 270 |
+
& (counts["n_reviews"] <= max_reviews)]
|
| 271 |
+
log.info(f"{len(eligible):,} users in [{min_reviews},{max_reviews}] reviews")
|
| 272 |
+
|
| 273 |
+
cross = eligible[eligible["n_domains"] >= 2]
|
| 274 |
+
single = eligible[eligible["n_domains"] == 1]
|
| 275 |
+
|
| 276 |
+
n_cross = min(len(cross), target_users // 3)
|
| 277 |
+
n_single = min(len(single), target_users - n_cross)
|
| 278 |
+
|
| 279 |
+
log.info(f"Sampling {n_cross:,} cross-domain + {n_single:,} single-domain users")
|
| 280 |
+
rng = np.random.default_rng(42)
|
| 281 |
+
cross_s = cross.sample(n=n_cross, random_state=rng.integers(1e9)) if n_cross else cross.head(0)
|
| 282 |
+
single_s = single.sample(n=n_single, random_state=rng.integers(1e9)) if n_single else single.head(0)
|
| 283 |
+
return pd.concat([cross_s, single_s], ignore_index=True)
|
| 284 |
+
|
| 285 |
+
|
| 286 |
+
def build_train_test_splits(reviews: pd.DataFrame, holdout: int) -> pd.DataFrame:
|
| 287 |
+
reviews = reviews.sort_values(["user_id", "timestamp"], ascending=[True, True])
|
| 288 |
+
reviews["rank_within_user"] = reviews.groupby("user_id").cumcount(ascending=False)
|
| 289 |
+
reviews["split"] = np.where(reviews["rank_within_user"] < holdout, "test", "train")
|
| 290 |
+
return reviews.drop(columns=["rank_within_user"])
|
| 291 |
+
|
| 292 |
+
|
| 293 |
+
def normalize_items_for_parquet(items: pd.DataFrame) -> pd.DataFrame:
|
| 294 |
+
"""Coerce messy item-metadata columns to clean dtypes."""
|
| 295 |
+
if items.empty:
|
| 296 |
+
return items
|
| 297 |
+
out = items.copy()
|
| 298 |
+
for col in ("price", "average_rating", "rating_number"):
|
| 299 |
+
if col in out.columns:
|
| 300 |
+
s = out[col].astype(str).str.replace(r"^\$", "", regex=True)
|
| 301 |
+
out[col] = pd.to_numeric(s, errors="coerce")
|
| 302 |
+
for col in ("parent_asin", "title", "description", "features",
|
| 303 |
+
"categories", "domain"):
|
| 304 |
+
if col in out.columns:
|
| 305 |
+
out[col] = out[col].astype(str).replace({"None": "", "nan": ""})
|
| 306 |
+
return out
|
| 307 |
+
|
| 308 |
+
|
| 309 |
+
def build_user_stats(reviews_train: pd.DataFrame) -> pd.DataFrame:
|
| 310 |
+
def lens(s):
|
| 311 |
+
return s.fillna("").astype(str).str.split().str.len()
|
| 312 |
+
stats = reviews_train.groupby("user_id").agg(
|
| 313 |
+
n_reviews=("rating", "size"),
|
| 314 |
+
avg_rating=("rating", "mean"),
|
| 315 |
+
std_rating=("rating", "std"),
|
| 316 |
+
avg_review_length=("text", lambda s: lens(s).mean()),
|
| 317 |
+
std_review_length=("text", lambda s: lens(s).std()),
|
| 318 |
+
verified_rate=("verified_purchase", "mean"),
|
| 319 |
+
domains=("domain", lambda s: list(s.unique())),
|
| 320 |
+
n_domains=("domain", "nunique"),
|
| 321 |
+
).reset_index()
|
| 322 |
+
stats["std_rating"] = stats["std_rating"].fillna(0)
|
| 323 |
+
stats["std_review_length"] = stats["std_review_length"].fillna(0)
|
| 324 |
+
return stats
|
| 325 |
+
|
| 326 |
+
|
| 327 |
+
# ββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ
|
| 328 |
+
# Main
|
| 329 |
+
# ββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ
|
| 330 |
+
|
| 331 |
+
def main():
|
| 332 |
+
ap = argparse.ArgumentParser()
|
| 333 |
+
ap.add_argument("--rows-per-category", type=int, default=150_000)
|
| 334 |
+
ap.add_argument("--meta-scan-cap", type=int, default=600_000,
|
| 335 |
+
help="Max metadata rows to scan per category (smaller=faster)")
|
| 336 |
+
ap.add_argument("--target-users", type=int, default=DEFAULT_USERS_PER_CATEGORY * 3)
|
| 337 |
+
ap.add_argument("--min-reviews", type=int, default=DEFAULT_MIN_REVIEWS)
|
| 338 |
+
ap.add_argument("--max-reviews", type=int, default=DEFAULT_MAX_REVIEWS)
|
| 339 |
+
ap.add_argument("--test-holdout", type=int, default=DEFAULT_TEST_HOLDOUT)
|
| 340 |
+
ap.add_argument("--skip-meta", action="store_true",
|
| 341 |
+
help="Skip metadata download; use review titles as item info")
|
| 342 |
+
args = ap.parse_args()
|
| 343 |
+
|
| 344 |
+
PROCESSED_DIR.mkdir(parents=True, exist_ok=True)
|
| 345 |
+
RAW_DIR.mkdir(parents=True, exist_ok=True)
|
| 346 |
+
|
| 347 |
+
# ββ Phase 1: reviews (cached per category) ββββββββββββββββββββββββββββββ
|
| 348 |
+
log.info("=" * 70)
|
| 349 |
+
log.info("PHASE 1: downloading review files (resumable)")
|
| 350 |
+
log.info("=" * 70)
|
| 351 |
+
for cat in CATEGORIES:
|
| 352 |
+
cache_path = RAW_DIR / f"review_{cat}.jsonl"
|
| 353 |
+
url = f"{BASE_URL}/raw/review_categories/{cat}.jsonl"
|
| 354 |
+
log.info(f"[{cat}] reviews β {cache_path.name}")
|
| 355 |
+
stream_to_cache(url, cache_path, max_rows=args.rows_per_category)
|
| 356 |
+
|
| 357 |
+
all_reviews = []
|
| 358 |
+
for cat in CATEGORIES:
|
| 359 |
+
cache_path = RAW_DIR / f"review_{cat}.jsonl"
|
| 360 |
+
df = load_reviews_from_cache(cache_path, cat)
|
| 361 |
+
log.info(f"[{cat}] loaded {len(df):,} reviews from cache")
|
| 362 |
+
all_reviews.append(df)
|
| 363 |
+
reviews = pd.concat(all_reviews, ignore_index=True)
|
| 364 |
+
log.info(f"Total raw reviews: {len(reviews):,}")
|
| 365 |
+
|
| 366 |
+
# ββ Phase 2: clean + sample users + splits βββββββββββββββββββββββββββββ
|
| 367 |
+
log.info("=" * 70)
|
| 368 |
+
log.info("PHASE 2: filtering, sampling, splits")
|
| 369 |
+
log.info("=" * 70)
|
| 370 |
+
reviews = reviews.dropna(subset=["user_id", "parent_asin", "rating", "text"])
|
| 371 |
+
reviews = reviews[reviews["text"].astype(str).str.len() > 20]
|
| 372 |
+
log.info(f"After cleaning: {len(reviews):,} reviews")
|
| 373 |
+
|
| 374 |
+
user_sample = sample_users(reviews, args.min_reviews, args.max_reviews,
|
| 375 |
+
args.target_users)
|
| 376 |
+
keep_users = set(user_sample["user_id"])
|
| 377 |
+
reviews = reviews[reviews["user_id"].isin(keep_users)].reset_index(drop=True)
|
| 378 |
+
log.info(f"After user filter: {len(reviews):,} reviews / {len(keep_users):,} users")
|
| 379 |
+
|
| 380 |
+
reviews = build_train_test_splits(reviews, holdout=args.test_holdout)
|
| 381 |
+
n_train = (reviews["split"] == "train").sum()
|
| 382 |
+
n_test = (reviews["split"] == "test").sum()
|
| 383 |
+
log.info(f"Train: {n_train:,} | Test: {n_test:,}")
|
| 384 |
+
|
| 385 |
+
# ββ Phase 3: metadata (cached per category) βββββββββββββββββββββββββββββ
|
| 386 |
+
log.info("=" * 70)
|
| 387 |
+
log.info("PHASE 3: downloading item metadata (resumable)")
|
| 388 |
+
log.info("=" * 70)
|
| 389 |
+
if args.skip_meta:
|
| 390 |
+
log.info("--skip-meta set; building minimal catalog from review titles")
|
| 391 |
+
items = (reviews.groupby(["parent_asin", "domain"])
|
| 392 |
+
.agg(title=("title", "first"))
|
| 393 |
+
.reset_index())
|
| 394 |
+
items["description"] = ""
|
| 395 |
+
items["features"] = ""
|
| 396 |
+
items["categories"] = ""
|
| 397 |
+
items["average_rating"] = None
|
| 398 |
+
items["rating_number"] = None
|
| 399 |
+
items["price"] = None
|
| 400 |
+
else:
|
| 401 |
+
for cat in CATEGORIES:
|
| 402 |
+
cache_path = RAW_DIR / f"meta_{cat}.jsonl"
|
| 403 |
+
url = f"{BASE_URL}/raw/meta_categories/meta_{cat}.jsonl"
|
| 404 |
+
cat_asins = set(reviews.loc[reviews["domain"] == cat, "parent_asin"])
|
| 405 |
+
log.info(f"[{cat}] metadata β {cache_path.name} (target {len(cat_asins):,} items)")
|
| 406 |
+
stream_filter_to_cache(url, cache_path, cat_asins,
|
| 407 |
+
max_scan=args.meta_scan_cap)
|
| 408 |
+
|
| 409 |
+
all_items = []
|
| 410 |
+
for cat in CATEGORIES:
|
| 411 |
+
cache_path = RAW_DIR / f"meta_{cat}.jsonl"
|
| 412 |
+
df = load_meta_from_cache(cache_path, cat)
|
| 413 |
+
log.info(f"[{cat}] loaded {len(df):,} metadata rows from cache")
|
| 414 |
+
all_items.append(df)
|
| 415 |
+
items = pd.concat(all_items, ignore_index=True)
|
| 416 |
+
if not items.empty:
|
| 417 |
+
items = items.drop_duplicates(subset=["parent_asin"])
|
| 418 |
+
|
| 419 |
+
# Fallback for items without metadata: use review title
|
| 420 |
+
found = set(items["parent_asin"]) if not items.empty else set()
|
| 421 |
+
missing = (reviews[~reviews["parent_asin"].isin(found)]
|
| 422 |
+
.groupby(["parent_asin", "domain"])
|
| 423 |
+
.agg(title=("title", "first"))
|
| 424 |
+
.reset_index())
|
| 425 |
+
if not missing.empty:
|
| 426 |
+
for col in ("description", "features", "categories"):
|
| 427 |
+
missing[col] = ""
|
| 428 |
+
for col in ("average_rating", "rating_number", "price"):
|
| 429 |
+
missing[col] = None
|
| 430 |
+
items = pd.concat([items, missing], ignore_index=True)
|
| 431 |
+
log.info(f"Added {len(missing):,} items from review-title fallback")
|
| 432 |
+
|
| 433 |
+
# ββ Phase 4: write parquet outputs ββββββββββββββββββββββββββββββββββββββ
|
| 434 |
+
log.info("=" * 70)
|
| 435 |
+
log.info("PHASE 4: writing processed parquet files")
|
| 436 |
+
log.info("=" * 70)
|
| 437 |
+
user_stats = build_user_stats(reviews[reviews["split"] == "train"])
|
| 438 |
+
items = normalize_items_for_parquet(items)
|
| 439 |
+
|
| 440 |
+
reviews.to_parquet(PROCESSED_DIR / "reviews.parquet", index=False)
|
| 441 |
+
items.to_parquet(PROCESSED_DIR / "items.parquet", index=False)
|
| 442 |
+
user_stats.to_parquet(PROCESSED_DIR / "users.parquet", index=False)
|
| 443 |
+
log.info(f"Wrote processed files to {PROCESSED_DIR}/")
|
| 444 |
+
log.info(f" reviews.parquet: {len(reviews):,} rows")
|
| 445 |
+
log.info(f" items.parquet: {len(items):,} rows")
|
| 446 |
+
log.info(f" users.parquet: {len(user_stats):,} rows")
|
| 447 |
+
log.info("Done.")
|
| 448 |
+
|
| 449 |
+
|
| 450 |
+
if __name__ == "__main__":
|
| 451 |
+
main()
|
data/processed/items.parquet
ADDED
|
@@ -0,0 +1,3 @@
|
|
|
|
|
|
|
|
|
|
|
|
|
| 1 |
+
version https://git-lfs.github.com/spec/v1
|
| 2 |
+
oid sha256:7fb677fd8ee43d658f0e89c069b086d7970b60c38d9c6351cb82702c9f763a78
|
| 3 |
+
size 24175703
|
data/processed/reviews.parquet
ADDED
|
@@ -0,0 +1,3 @@
|
|
|
|
|
|
|
|
|
|
|
|
|
| 1 |
+
version https://git-lfs.github.com/spec/v1
|
| 2 |
+
oid sha256:146d5580b3bfecfda13c2830494a2f307e9d596d51b4328f4189c21f1dc7ee51
|
| 3 |
+
size 18938270
|
data/processed/users.parquet
ADDED
|
@@ -0,0 +1,3 @@
|
|
|
|
|
|
|
|
|
|
|
|
|
| 1 |
+
version https://git-lfs.github.com/spec/v1
|
| 2 |
+
oid sha256:b04d6ff1b4fe5070c4ecb003a0bc10e808a081745fed987cc0dcc530c132cc82
|
| 3 |
+
size 378710
|
data/raw/meta_Books.jsonl
ADDED
|
@@ -0,0 +1,3 @@
|
|
|
|
|
|
|
|
|
|
|
|
|
| 1 |
+
version https://git-lfs.github.com/spec/v1
|
| 2 |
+
oid sha256:ebb16d52728bb3003bcdc1b97cc946d6a83bd580e05de7f459217ef518e48d2e
|
| 3 |
+
size 22242306
|
data/raw/meta_Kindle_Store.jsonl
ADDED
|
@@ -0,0 +1,3 @@
|
|
|
|
|
|
|
|
|
|
|
|
|
| 1 |
+
version https://git-lfs.github.com/spec/v1
|
| 2 |
+
oid sha256:72e1cf19f353726bf01acb17593b4a06768beb131fbc4015258eb8ee3b4f859b
|
| 3 |
+
size 63585913
|
data/raw/meta_Movies_and_TV.jsonl
ADDED
|
@@ -0,0 +1,3 @@
|
|
|
|
|
|
|
|
|
|
|
|
|
| 1 |
+
version https://git-lfs.github.com/spec/v1
|
| 2 |
+
oid sha256:b5ce1ab12dd34f020eff8aeff57d489478a8c2d7f74154b3e18954b9f1ade031
|
| 3 |
+
size 54309438
|
data/raw/review_Books.jsonl
ADDED
|
@@ -0,0 +1,3 @@
|
|
|
|
|
|
|
|
|
|
|
|
|
| 1 |
+
version https://git-lfs.github.com/spec/v1
|
| 2 |
+
oid sha256:2a9b93b6f07db1f5ad7406f5071d8dddb6d4ab3fbe65b5aba77e1263220d216c
|
| 3 |
+
size 130059526
|
data/raw/review_Kindle_Store.jsonl
ADDED
|
@@ -0,0 +1,3 @@
|
|
|
|
|
|
|
|
|
|
|
|
|
| 1 |
+
version https://git-lfs.github.com/spec/v1
|
| 2 |
+
oid sha256:007c1e7e87b2035df32d6b0f4d8d535cdb752909753dd13c31f7a7f2ef72edf0
|
| 3 |
+
size 100094891
|
data/raw/review_Movies_and_TV.jsonl
ADDED
|
@@ -0,0 +1,3 @@
|
|
|
|
|
|
|
|
|
|
|
|
|
| 1 |
+
version https://git-lfs.github.com/spec/v1
|
| 2 |
+
oid sha256:dc3c191a9debb16cb6b6341a90035cc1d4f7c8af637c00e1531bf561cab50d68
|
| 3 |
+
size 78862448
|