MayankChoudhary76 commited on
Commit
24a5fa2
·
1 Parent(s): 49b4fc5

✅ Final API changes

Browse files
Files changed (3) hide show
  1. api/app_api.py +28 -0
  2. src/service/recommender.py +30 -368
  3. src/utils/paths.py +36 -35
api/app_api.py CHANGED
@@ -1,5 +1,33 @@
1
  # api/app_api.py (Part 1/5)
2
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
3
  from __future__ import annotations
4
 
5
  import os
 
1
  # api/app_api.py (Part 1/5)
2
 
3
+ # ✅ Insert this at the top of app_api.py imports
4
+ from fastapi import APIRouter
5
+ from huggingface_hub import hf_hub_download
6
+
7
+ # ✅ Add this new router declaration
8
+ router = APIRouter()
9
+
10
+ # ✅ Add this new /manifest route definition
11
+ @router.get("/manifest")
12
+ def get_file_manifest():
13
+ """Serve file_manifest.json from HF dataset repo dynamically."""
14
+ try:
15
+ manifest_path = hf_hub_download(
16
+ repo_id="mickey1976/mayankc-amazon_beauty_subset",
17
+ filename="file_manifest.json",
18
+ repo_type="dataset"
19
+ )
20
+ with open(manifest_path, "r") as f:
21
+ manifest = json.load(f)
22
+ return {"ok": True, "manifest": manifest}
23
+ except Exception as e:
24
+ return {"ok": False, "error": str(e)}
25
+
26
+ # ✅ Register this router in your FastAPI app
27
+ # At the bottom of app_api.py (or wherever app = FastAPI is defined):
28
+
29
+ app.include_router(router)
30
+
31
  from __future__ import annotations
32
 
33
  import os
src/service/recommender.py CHANGED
@@ -1,394 +1,56 @@
1
- # src/service/recommender.py
2
- from __future__ import annotations
3
-
4
- from dataclasses import dataclass, field
5
  from pathlib import Path
6
- from typing import Optional, Tuple, List, Dict, Any
7
-
8
- import json
9
- import numpy as np
10
- import pandas as pd
11
-
12
- from src.utils.paths import get_processed_path
13
- from src.models.fusion import concat_fusion, weighted_sum_fusion
14
-
15
- ITEM_KEY = "item_id"
16
-
17
-
18
- # ---------------------------- dataclasses ---------------------------- #
19
-
20
- @dataclass
21
- class FusionWeights:
22
- text: float = 1.0
23
- image: float = 0.0
24
- meta: float = 0.0
25
-
26
-
27
- @dataclass
28
- class RecommendConfig:
29
- dataset: str
30
- user_id: str
31
- k: int = 10
32
- fusion: str = "weighted"
33
- weights: FusionWeights = field(default_factory=FusionWeights)
34
- use_faiss: bool = False
35
- faiss_name: Optional[str] = None
36
- exclude_seen: bool = True
37
- alpha: Optional[float] = None # legacy param accepted, ignored
38
-
39
-
40
- # ---------------------------- IO helpers ---------------------------- #
41
-
42
- def _proc_dir(dataset: str) -> Path:
43
- p = Path(get_processed_path(dataset))
44
- if not p.exists():
45
- raise FileNotFoundError(f"Processed dir not found: {p}")
46
- return p
47
-
48
-
49
- def _read_parquet(fp: Path, required_cols: Optional[List[str]] = None) -> pd.DataFrame:
50
- if not fp.exists():
51
- raise FileNotFoundError(f"Missing file: {fp}")
52
- df = pd.read_parquet(fp)
53
- if required_cols:
54
- missing = [c for c in required_cols if c not in df.columns]
55
- if missing:
56
- raise KeyError(f"{fp} missing columns: {missing} | has {list(df.columns)}")
57
- return df
58
 
 
 
59
 
60
  def _load_defaults(dataset: str) -> Dict[str, Dict[str, Any]]:
61
- fp = Path(f"data/processed/{dataset}/index/defaults.json")
62
- if not fp.exists():
63
- return {}
 
 
 
 
 
 
 
64
  try:
65
- return json.loads(fp.read_text())
 
66
  except Exception:
67
  return {}
68
 
69
 
 
 
70
  def _load_user_vec(proc: Path, user_id: str) -> np.ndarray:
71
- dfu = _read_parquet(proc / "user_text_emb.parquet", ["user_id", "vector"])
 
 
 
72
  row = dfu[dfu["user_id"] == user_id]
73
  if row.empty:
74
- raise ValueError(
75
- f"user_id not found in user_text_emb.parquet: {user_id}. "
76
- f"Run scripts/build_text_emb.py."
77
- )
78
  v = np.asarray(row.iloc[0]["vector"], dtype=np.float32)
79
  return v / (np.linalg.norm(v) + 1e-12)
80
 
81
 
82
- def _load_item_dfs(proc: Path):
83
- Mt = _read_parquet(proc / "item_text_emb.parquet", [ITEM_KEY, "vector"])
84
- Mi = _read_parquet(proc / "item_image_emb.parquet", [ITEM_KEY, "vector"])
85
- meta_fp = proc / "item_meta_emb.parquet"
86
- Mm = _read_parquet(meta_fp, [ITEM_KEY, "vector"]) if meta_fp.exists() else None
87
- return Mt, Mi, Mm
88
-
89
-
90
  def _load_items_table(proc: Path) -> pd.DataFrame:
91
- items = _read_parquet(proc / "items_with_meta.parquet")
 
 
 
92
  if ITEM_KEY not in items.columns:
93
  if items.index.name == ITEM_KEY:
94
  items = items.reset_index()
95
  else:
96
- raise KeyError(f"{ITEM_KEY} not found in items_with_meta.parquet")
97
  return items
98
 
99
 
100
  def _user_seen_items(proc: Path, user_id: str) -> set:
101
- df = _read_parquet(proc / "reviews.parquet", ["user_id", ITEM_KEY])
102
- return set(df[df["user_id"] == user_id][ITEM_KEY].tolist())
103
-
104
-
105
- # --------------------------- math helpers --------------------------- #
106
-
107
- def _l2norm_rows(M: np.ndarray) -> np.ndarray:
108
- return M / (np.linalg.norm(M, axis=1, keepdims=True) + 1e-12)
109
-
110
-
111
- def _cosine_scores(query: np.ndarray, matrix: np.ndarray) -> np.ndarray:
112
- q = query.reshape(1, -1)
113
- q = q / (np.linalg.norm(q) + 1e-12)
114
- M = _l2norm_rows(matrix)
115
- return (q @ M.T).ravel()
116
-
117
-
118
- def _align_modalities(Mt: pd.DataFrame, Mi: pd.DataFrame, Mm: Optional[pd.DataFrame]):
119
- base = Mt[[ITEM_KEY]].merge(Mi[[ITEM_KEY]], on=ITEM_KEY)
120
- if Mm is not None:
121
- base = base.merge(Mm[[ITEM_KEY]], on=ITEM_KEY)
122
- item_ids = base[ITEM_KEY].tolist()
123
-
124
- def reindex(mat_df: pd.DataFrame, ids: List[str]) -> np.ndarray:
125
- v = mat_df.set_index(ITEM_KEY).loc[ids, "vector"].to_numpy()
126
- return np.vstack(v).astype(np.float32)
127
-
128
- Vt = reindex(Mt, item_ids)
129
- Vi = reindex(Mi, item_ids)
130
- Vm = reindex(Mm, item_ids) if Mm is not None else None
131
- return item_ids, Vt, Vi, Vm
132
-
133
- def _concat_user_vector(user_text_vec: np.ndarray,
134
- dim_t: int, dim_i: int, dim_m: int,
135
- w_text: float, w_image: float, w_meta: float) -> np.ndarray:
136
- ut = user_text_vec / (np.linalg.norm(user_text_vec) + 1e-12)
137
- parts = [w_text * ut]
138
- if dim_i > 0:
139
- parts.append(np.zeros((dim_i,), dtype=np.float32))
140
- if dim_m > 0:
141
- parts.append(np.zeros((dim_m,), dtype=np.float32))
142
- uf = np.concatenate(parts, axis=0).astype(np.float32)
143
- return uf / (np.linalg.norm(uf) + 1e-12)
144
-
145
-
146
- def _weighted_user_vector(user_text_vec: np.ndarray, target_dim: int, w_text: float) -> np.ndarray:
147
- ut = (w_text * user_text_vec).astype(np.float32)
148
- ut = ut / (np.linalg.norm(ut) + 1e-12)
149
- d = ut.shape[0]
150
- if d == target_dim:
151
- uf = ut
152
- elif d < target_dim:
153
- pad = np.zeros((target_dim - d,), dtype=np.float32)
154
- uf = np.concatenate([ut, pad], axis=0)
155
- else:
156
- uf = ut[:target_dim]
157
- return uf / (np.linalg.norm(uf) + 1e-12)
158
-
159
-
160
- # -------------------------- FAISS integration ----------------------- #
161
-
162
- def _faiss_search(proc: Path, name: str, query_vec: np.ndarray, k: int) -> Tuple[np.ndarray, np.ndarray]:
163
  try:
164
- import faiss # type: ignore
165
- except Exception as e:
166
- raise RuntimeError("FAISS not available. Install faiss-cpu or disable use_faiss.") from e
167
-
168
- idx_dir = proc / "index"
169
- index_fp = idx_dir / f"items_{name}.faiss"
170
- ids_fp = idx_dir / f"items_{name}.npy"
171
- if not index_fp.exists() or not ids_fp.exists():
172
- raise FileNotFoundError(f"FAISS index or ids not found: {index_fp}, {ids_fp}")
173
-
174
- index = faiss.read_index(str(index_fp))
175
- q = query_vec.astype(np.float32).reshape(1, -1)
176
- D, I = index.search(q, k)
177
- ids = np.load(ids_fp, allow_pickle=True)
178
- return D[0], ids[I[0]]
179
-
180
-
181
- def _resolve_faiss_name(dataset: str, fusion: str, faiss_name: Optional[str], defaults: Dict[str, Dict[str, Any]]) -> str:
182
- """
183
- Order of resolution:
184
- 1) explicit faiss_name if provided
185
- 2) defaults.json → defaults[fusion].faiss_name if present
186
- 3) conventional fallback:
187
- - concat: f"{dataset}_concat"
188
- - weighted: f"{dataset}_weighted_wt{wt}_wi{wi}_wm{wm}" (rounded)
189
- """
190
- if faiss_name:
191
- return faiss_name
192
-
193
- d = (defaults or {}).get(fusion, {})
194
- if isinstance(d, dict):
195
- n = d.get("faiss_name") or d.get("index_name")
196
- if isinstance(n, str) and n:
197
- return n
198
-
199
- if fusion == "concat":
200
- return f"{dataset}_concat"
201
-
202
- # weighted fallback uses weights baked into index filename
203
- wt = d.get("w_text", 1.0)
204
- wi = d.get("w_image", 0.0)
205
- wm = d.get("w_meta", 0.0)
206
-
207
- def _fmt(x: float) -> str:
208
- return f"{x:.1f}".rstrip("0").rstrip(".") if "." in f"{x:.1f}" else f"{x:.1f}"
209
-
210
- return f"{dataset}_weighted_wt{_fmt(wt)}_wi{_fmt(wi)}_wm{_fmt(wm)}"
211
-
212
- # ----------------------------- core logic --------------------------- #
213
-
214
- def _recommend_concat(proc: Path,
215
- dataset: str,
216
- user_id: str,
217
- k: int,
218
- exclude_seen: bool,
219
- use_faiss: bool,
220
- faiss_name: Optional[str],
221
- w_text: float,
222
- w_image: float,
223
- w_meta: float) -> Tuple[pd.DataFrame, List[str]]:
224
- items_df = _load_items_table(proc)
225
- Mt, Mi, Mm = _load_item_dfs(proc)
226
- user_vec = _load_user_vec(proc, user_id)
227
- item_ids, Vt, Vi, Vm = _align_modalities(Mt, Mi, Mm)
228
-
229
- # Build fused item matrix and a compatible user vector
230
- Vf = concat_fusion(Vt, Vi, Vm, weights=(w_text, w_image, w_meta))
231
- uf = _concat_user_vector(
232
- user_text_vec=user_vec,
233
- dim_t=Vt.shape[1],
234
- dim_i=Vi.shape[1],
235
- dim_m=0 if Vm is None else Vm.shape[1],
236
- w_text=w_text, w_image=w_image, w_meta=w_meta
237
- )
238
-
239
- # Exclusions
240
- exclude = _user_seen_items(proc, user_id) if exclude_seen else set()
241
-
242
- # Search
243
- rec_ids: List[str]
244
- scores: np.ndarray
245
- if use_faiss:
246
- # Auto-resolve index name if missing
247
- defaults = _load_defaults(dataset)
248
- idx_name = _resolve_faiss_name(dataset, "concat", faiss_name, defaults)
249
- D, hits = _faiss_search(proc, idx_name, uf, k + 200)
250
- # Keep in catalog order map to fetch scores from Vf
251
- id2row = {iid: i for i, iid in enumerate(item_ids)}
252
- rec_ids = [iid for iid in hits.tolist() if iid not in exclude][:k]
253
- sel = np.array([id2row[i] for i in rec_ids], dtype=np.int64)
254
- scores = (uf.reshape(1, -1) @ _l2norm_rows(Vf[sel]).T).ravel()
255
- else:
256
- scores_all = (uf.reshape(1, -1) @ _l2norm_rows(Vf).T).ravel()
257
- mask = np.array([iid not in exclude for iid in item_ids], dtype=bool)
258
- scores_all = np.where(mask, scores_all, -np.inf)
259
- topk_idx = np.argpartition(scores_all, -k)[-k:]
260
- topk_idx = topk_idx[np.argsort(scores_all[topk_idx])[::-1]]
261
- rec_ids = [item_ids[i] for i in topk_idx]
262
- scores = scores_all[topk_idx]
263
-
264
- out = items_df.merge(
265
- pd.DataFrame({ITEM_KEY: rec_ids, "score": scores}),
266
- on=ITEM_KEY, how="right"
267
- )
268
- out = out.sort_values("score", ascending=False).reset_index(drop=True)
269
- return out, rec_ids
270
-
271
- def _recommend_weighted(proc: Path,
272
- dataset: str,
273
- user_id: str,
274
- k: int,
275
- exclude_seen: bool,
276
- use_faiss: bool,
277
- faiss_name: Optional[str],
278
- w_text: float,
279
- w_image: float,
280
- w_meta: float) -> Tuple[pd.DataFrame, List[str]]:
281
- items_df = _load_items_table(proc)
282
- Mt, Mi, Mm = _load_item_dfs(proc)
283
- user_vec = _load_user_vec(proc, user_id)
284
- item_ids, Vt, Vi, Vm = _align_modalities(Mt, Mi, Mm)
285
-
286
- # Fuse items with weighted-sum and create a compatible user vector
287
- Vf = weighted_sum_fusion(Vt, Vi, Vm, weights=(w_text, w_image, w_meta))
288
- uf = _weighted_user_vector(user_text_vec=user_vec, target_dim=Vf.shape[1], w_text=w_text)
289
-
290
- # Exclusions
291
- exclude = _user_seen_items(proc, user_id) if exclude_seen else set()
292
-
293
- # Search
294
- rec_ids: List[str]
295
- scores: np.ndarray
296
- if use_faiss:
297
- defaults = _load_defaults(dataset)
298
- idx_name = _resolve_faiss_name(dataset, "weighted", faiss_name, defaults)
299
- D, hits = _faiss_search(proc, idx_name, uf, k + 200)
300
- # filter seen, then clip
301
- filtered = [(float(d), iid) for d, iid in zip(D.tolist(), hits.tolist()) if iid not in exclude]
302
- filtered = filtered[:k]
303
- if filtered:
304
- scores = np.array([d for d, _ in filtered], dtype=np.float32)
305
- rec_ids = [iid for _, iid in filtered]
306
- else:
307
- scores = np.array([], dtype=np.float32)
308
- rec_ids = []
309
- else:
310
- scores_all = (uf.reshape(1, -1) @ _l2norm_rows(Vf).T).ravel()
311
- mask = np.array([iid not in exclude for iid in item_ids], dtype=bool)
312
- scores_all = np.where(mask, scores_all, -np.inf)
313
- topk_idx = np.argpartition(scores_all, -k)[-k:]
314
- topk_idx = topk_idx[np.argsort(scores_all[topk_idx])[::-1]]
315
- rec_ids = [item_ids[i] for i in topk_idx]
316
- scores = scores_all[topk_idx]
317
-
318
- out = items_df.merge(
319
- pd.DataFrame({ITEM_KEY: rec_ids, "score": scores}),
320
- on=ITEM_KEY, how="right"
321
- )
322
- out = out.sort_values("score", ascending=False).reset_index(drop=True)
323
- return out, rec_ids
324
-
325
- # -------------------------- public API ------------------------------ #
326
-
327
- def recommend_for_user(cfg: RecommendConfig) -> Dict[str, Any]:
328
- """
329
- Entry point used by api/app_api.py. Returns a dict ready for JSON response.
330
- It also auto-loads defaults.json to fill in weights/Faiss name when omitted.
331
- """
332
- proc = _proc_dir(cfg.dataset)
333
- defaults = _load_defaults(cfg.dataset)
334
-
335
- # Resolve weights: cfg.weights (if set) < defaults.json < fallback
336
- defw = defaults.get(cfg.fusion, {}) if defaults else {}
337
- wt = (cfg.weights.text
338
- if (cfg.weights and cfg.weights.text is not None)
339
- else defw.get("w_text", 1.0))
340
- wi = (cfg.weights.image
341
- if (cfg.weights and cfg.weights.image is not None)
342
- else defw.get("w_image", 0.0))
343
- wm = (cfg.weights.meta
344
- if (cfg.weights and cfg.weights.meta is not None)
345
- else defw.get("w_meta", 0.0))
346
-
347
- # Route to correct recommender
348
- if cfg.fusion == "concat":
349
- out, rec_ids = _recommend_concat(
350
- proc=proc,
351
- dataset=cfg.dataset,
352
- user_id=cfg.user_id,
353
- k=cfg.k,
354
- exclude_seen=cfg.exclude_seen,
355
- use_faiss=cfg.use_faiss,
356
- faiss_name=cfg.faiss_name,
357
- w_text=float(wt), w_image=float(wi), w_meta=float(wm),
358
- )
359
- elif cfg.fusion == "weighted":
360
- out, rec_ids = _recommend_weighted(
361
- proc=proc,
362
- dataset=cfg.dataset,
363
- user_id=cfg.user_id,
364
- k=cfg.k,
365
- exclude_seen=cfg.exclude_seen,
366
- use_faiss=cfg.use_faiss,
367
- faiss_name=cfg.faiss_name,
368
- w_text=float(wt), w_image=float(wi), w_meta=float(wm),
369
- )
370
- else:
371
- raise ValueError("fusion must be one of {'concat','weighted'}")
372
-
373
- # Ensure purely JSON-serializable payload
374
- cols = [c for c in [ITEM_KEY, "score", "brand", "price", "categories", "image_url"]
375
- if c in out.columns]
376
- if "score" in cols:
377
- out["score"] = out["score"].astype(float)
378
-
379
- records: List[Dict[str, Any]] = out[cols].head(int(cfg.k)).to_dict(orient="records")
380
-
381
- return {
382
- "dataset": cfg.dataset,
383
- "user_id": cfg.user_id,
384
- "fusion": cfg.fusion,
385
- "weights": {"text": float(wt), "image": float(wi), "meta": float(wm)},
386
- "k": int(cfg.k),
387
- "exclude_seen": bool(cfg.exclude_seen),
388
- "use_faiss": bool(cfg.use_faiss),
389
- "faiss_name": cfg.faiss_name,
390
- "results": records,
391
- }
392
-
393
-
394
- __all__ = ["FusionWeights", "RecommendConfig", "recommend_for_user"]
 
 
 
 
 
1
  from pathlib import Path
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
2
 
3
+ # Replace hardcoded path with Hugging Face-aware fallback
4
+ from src.utils.paths import get_processed_path, _hf_download
5
 
6
  def _load_defaults(dataset: str) -> Dict[str, Dict[str, Any]]:
7
+ """
8
+ Load defaults.json for a dataset.
9
+ Try local path first; fall back to HF hub if needed.
10
+ """
11
+ try:
12
+ fp = get_processed_path(dataset) / "index" / "defaults.json"
13
+ if fp.exists():
14
+ return json.loads(fp.read_text())
15
+ except Exception:
16
+ pass
17
  try:
18
+ # fallback (root-level for HF structure)
19
+ return json.loads(_hf_download("json/defaults.json").read_text())
20
  except Exception:
21
  return {}
22
 
23
 
24
+ # Likewise for these load functions:
25
+
26
  def _load_user_vec(proc: Path, user_id: str) -> np.ndarray:
27
+ try:
28
+ dfu = _read_parquet(proc / "user_text_emb.parquet", ["user_id", "vector"])
29
+ except FileNotFoundError:
30
+ dfu = pd.read_parquet(_hf_download("parquet/user_text_emb.parquet"), columns=["user_id", "vector"])
31
  row = dfu[dfu["user_id"] == user_id]
32
  if row.empty:
33
+ raise ValueError(f"user_id '{user_id}' not found. Run text embedding step.")
 
 
 
34
  v = np.asarray(row.iloc[0]["vector"], dtype=np.float32)
35
  return v / (np.linalg.norm(v) + 1e-12)
36
 
37
 
 
 
 
 
 
 
 
 
38
  def _load_items_table(proc: Path) -> pd.DataFrame:
39
+ try:
40
+ items = _read_parquet(proc / "items_with_meta.parquet")
41
+ except FileNotFoundError:
42
+ items = pd.read_parquet(_hf_download("parquet/items_with_meta.parquet"))
43
  if ITEM_KEY not in items.columns:
44
  if items.index.name == ITEM_KEY:
45
  items = items.reset_index()
46
  else:
47
+ raise KeyError(f"'{ITEM_KEY}' not found in items_with_meta.parquet")
48
  return items
49
 
50
 
51
  def _user_seen_items(proc: Path, user_id: str) -> set:
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
52
  try:
53
+ df = _read_parquet(proc / "reviews.parquet", ["user_id", ITEM_KEY])
54
+ except FileNotFoundError:
55
+ df = pd.read_parquet(_hf_download("parquet/reviews.parquet"), columns=["user_id", ITEM_KEY])
56
+ return set(df[df["user_id"] == user_id][ITEM_KEY].tolist())
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
src/utils/paths.py CHANGED
@@ -5,7 +5,7 @@ import os
5
  from huggingface_hub import hf_hub_download
6
 
7
  # --- Constants ---
8
- HF_REPO = "mayankc/mayankc-amazon_beauty_subset"
9
  CACHE: Dict[str, Path] = {}
10
 
11
  # --- project roots ---
@@ -17,76 +17,77 @@ CACHE_DIR = DATA_DIR / "cache"
17
  LOGS_DIR = PROJECT_ROOT / "logs"
18
  MODELS_DIR = PROJECT_ROOT / "src" / "models"
19
 
 
20
  def ensure_dir(path: Union[str, Path]) -> Path:
21
- """
22
- Ensure a directory exists. Accepts either a str or a pathlib.Path.
23
- Returns a pathlib.Path.
24
- """
25
  p = Path(path) if not isinstance(path, Path) else path
26
  p.mkdir(parents=True, exist_ok=True)
27
  return p
28
 
 
29
  def get_raw_path(dataset: str) -> Path:
30
- """.../data/raw/<dataset>"""
31
  return ensure_dir(RAW_DIR / dataset)
32
 
 
33
  def _hf_download(filename: str) -> Path:
34
  if filename in CACHE:
35
  return CACHE[filename]
36
- path = hf_hub_download(repo_id=HF_REPO, filename=filename)
37
  CACHE[filename] = Path(path)
38
  return Path(path)
39
 
 
40
  def get_processed_path(dataset: str) -> Path:
41
- """
42
- For Hugging Face deployment:
43
- - If running locally, use local /data/processed/<dataset>
44
- - If on Spaces or missing local files, fall back to hf_hub_download
45
- Returns the base processed folder (parent of downloaded file).
46
- """
47
  local_path = PROCESSED_DIR / dataset
48
  if local_path.exists():
49
  return local_path
50
 
51
- # fallback to Hugging Face Dataset Hub
52
- fallback_file = f"{dataset}/user_text_emb.parquet"
53
  fallback_path = _hf_download(fallback_file)
54
  return fallback_path.parent
55
 
 
56
  def get_logs_path() -> Path:
57
  return ensure_dir(LOGS_DIR)
58
 
 
59
  def get_dataset_paths(dataset: str) -> Dict[str, Path]:
60
- """
61
- Returns dictionary of paths for known dataset assets.
62
- If local file not found, pulls from Hugging Face Hub.
63
- """
64
  dataset = dataset.lower()
65
- processed_dir = get_processed_path(dataset)
66
 
67
- def resolve_or_download(name: str) -> Path:
68
- local = processed_dir / name
69
  if local.exists():
70
  return local
71
- return _hf_download(f"{dataset}/{name}")
72
 
73
  return {
74
  "raw": get_raw_path(dataset),
75
- "processed": processed_dir,
76
  "cache": ensure_dir(CACHE_DIR / dataset),
77
  "logs": get_logs_path(),
78
 
79
- # Parquet input files
80
- "item_meta_emb_path": resolve_or_download("item_meta_emb.parquet"),
81
- "item_image_emb_path": resolve_or_download("item_image_emb.parquet"),
82
- "item_text_emb_path": resolve_or_download("item_text_emb.parquet"),
 
 
 
 
 
 
 
 
 
 
 
 
83
 
84
- # FAISS-related npy features
85
- "meta_features_path": resolve_or_download("meta_features.npy"),
86
- "text_features_path": resolve_or_download("text_features.npy"),
87
- "image_features_path": resolve_or_download("image_features.npy"),
88
- "labels_path": resolve_or_download("labels.json"),
89
 
90
- # ✅ FAISS fusion output path
91
- "faiss_fusion_path": resolve_or_download("faiss_fusion.index"),
 
92
  }
 
5
  from huggingface_hub import hf_hub_download
6
 
7
  # --- Constants ---
8
+ HF_REPO = "mickey1976/mayankc-amazon_beauty_subset"
9
  CACHE: Dict[str, Path] = {}
10
 
11
  # --- project roots ---
 
17
  LOGS_DIR = PROJECT_ROOT / "logs"
18
  MODELS_DIR = PROJECT_ROOT / "src" / "models"
19
 
20
+
21
  def ensure_dir(path: Union[str, Path]) -> Path:
 
 
 
 
22
  p = Path(path) if not isinstance(path, Path) else path
23
  p.mkdir(parents=True, exist_ok=True)
24
  return p
25
 
26
+
27
  def get_raw_path(dataset: str) -> Path:
 
28
  return ensure_dir(RAW_DIR / dataset)
29
 
30
+
31
  def _hf_download(filename: str) -> Path:
32
  if filename in CACHE:
33
  return CACHE[filename]
34
+ path = hf_hub_download(repo_id=HF_REPO, filename=filename, repo_type="dataset")
35
  CACHE[filename] = Path(path)
36
  return Path(path)
37
 
38
+
39
  def get_processed_path(dataset: str) -> Path:
 
 
 
 
 
 
40
  local_path = PROCESSED_DIR / dataset
41
  if local_path.exists():
42
  return local_path
43
 
44
+ # fallback: download any known file to get a valid parent path
45
+ fallback_file = f"parquet/user_text_emb.parquet"
46
  fallback_path = _hf_download(fallback_file)
47
  return fallback_path.parent
48
 
49
+
50
  def get_logs_path() -> Path:
51
  return ensure_dir(LOGS_DIR)
52
 
53
+
54
  def get_dataset_paths(dataset: str) -> Dict[str, Path]:
 
 
 
 
55
  dataset = dataset.lower()
 
56
 
57
+ def resolve_or_download(subfolder: str, name: str) -> Path:
58
+ local = PROCESSED_DIR / dataset / name
59
  if local.exists():
60
  return local
61
+ return _hf_download(f"{subfolder}/{name}")
62
 
63
  return {
64
  "raw": get_raw_path(dataset),
65
+ "processed": get_processed_path(dataset),
66
  "cache": ensure_dir(CACHE_DIR / dataset),
67
  "logs": get_logs_path(),
68
 
69
+ # JSON and config files
70
+ "defaults": resolve_or_download("json", "defaults.json"),
71
+ "item_ids": resolve_or_download("json", "item_ids.json"),
72
+ "user_seq": resolve_or_download("json", "user_seq.json"),
73
+
74
+ # Parquet files
75
+ "item_meta_emb": resolve_or_download("parquet", "item_meta_emb.parquet"),
76
+ "item_image_emb": resolve_or_download("parquet", "item_image_emb.parquet"),
77
+ "item_text_emb": resolve_or_download("parquet", "item_text_emb.parquet"),
78
+ "user_text_emb": resolve_or_download("parquet", "user_text_emb.parquet"),
79
+
80
+ # NPY files
81
+ "text": resolve_or_download("npy", "text.npy"),
82
+ "image": resolve_or_download("npy", "image.npy"),
83
+ "meta": resolve_or_download("npy", "meta.npy"),
84
+ "cove": resolve_or_download("npy", "cove.npy"),
85
 
86
+ # FAISS files
87
+ "faiss_concat": resolve_or_download("faiss", "items_beauty_concat.faiss"),
88
+ "faiss_weighted": resolve_or_download("faiss", "items_beauty_weighted.faiss"),
 
 
89
 
90
+ # Model
91
+ "adapter_model": resolve_or_download("model", "adapter_model.safetensors"),
92
+ "full_model": resolve_or_download("model", "model.safetensors"),
93
  }