siddhm11 commited on
Commit
003b415
Β·
1 Parent(s): 02df9f5

Phase 6.5 Day 4: Cluster snapshot versioning (B3)

Browse files

db.py:
- Add cluster_snapshots table DDL (append-only history, content-addressed)
PK: (user_id, snapshot_id, cluster_idx)
Indexes: user+date (retrospective queries), paper_ids_hash (LLM cache dedup)
- save_cluster_snapshot(): append one row per cluster per recluster event
Computes paper_ids_hash = sha256(sorted(paper_ids))[:16]
- prune_old_snapshots(): delete rows older than retention_days (default 30)
- Add imports: hashlib, json, uuid

recommendations.py:
- After save_clusters_to_db(), call db.save_cluster_snapshot()
Wrapped in try/except so snapshot failure is non-fatal

main.py:
- Call db.prune_old_snapshots(30) on startup (simple MVP, not APScheduler)

Tests: 203 passed, 0 failures

Files changed (3) hide show
  1. app/db.py +65 -0
  2. app/main.py +7 -0
  3. app/routers/recommendations.py +16 -0
app/db.py CHANGED
@@ -13,6 +13,9 @@ Phase 4.5 instrumentation columns (interactions table):
13
  cluster_id – which interest cluster served this paper (NULL if N/A)
14
  """
15
  import aiosqlite
 
 
 
16
  from app.config import DB_PATH
17
 
18
  # ── DDL ───────────────────────────────────────────────────────────────────────
@@ -88,6 +91,24 @@ CREATE TABLE IF NOT EXISTS user_onboarding (
88
  created_at TEXT NOT NULL DEFAULT (datetime('now')),
89
  updated_at TEXT NOT NULL DEFAULT (datetime('now'))
90
  );
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
91
  """
92
 
93
 
@@ -470,3 +491,47 @@ async def get_user_category_filter(user_id: str) -> set[str]:
470
  return set()
471
  from app.config import expand_category_groups
472
  return expand_category_groups(state["selected_categories"])
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
13
  cluster_id – which interest cluster served this paper (NULL if N/A)
14
  """
15
  import aiosqlite
16
+ import hashlib
17
+ import json
18
+ import uuid as _uuid
19
  from app.config import DB_PATH
20
 
21
  # ── DDL ───────────────────────────────────────────────────────────────────────
 
91
  created_at TEXT NOT NULL DEFAULT (datetime('now')),
92
  updated_at TEXT NOT NULL DEFAULT (datetime('now'))
93
  );
94
+
95
+ -- Phase 6.5 B3: Append-only cluster history (current-state still in user_clusters)
96
+ CREATE TABLE IF NOT EXISTS cluster_snapshots (
97
+ user_id TEXT NOT NULL,
98
+ snapshot_id TEXT NOT NULL, -- UUID, one per recluster event
99
+ cluster_idx INTEGER NOT NULL, -- stable index after Hungarian
100
+ medoid_paper_id TEXT NOT NULL,
101
+ importance REAL NOT NULL,
102
+ paper_ids TEXT NOT NULL, -- JSON array
103
+ medoid_embedding_blob BLOB,
104
+ snapshot_date TEXT NOT NULL DEFAULT (datetime('now')),
105
+ paper_ids_hash TEXT NOT NULL, -- sha256(sorted(paper_ids))[:16]
106
+ PRIMARY KEY (user_id, snapshot_id, cluster_idx)
107
+ );
108
+ CREATE INDEX IF NOT EXISTS idx_snap_user_date
109
+ ON cluster_snapshots(user_id, snapshot_date DESC);
110
+ CREATE INDEX IF NOT EXISTS idx_snap_hash
111
+ ON cluster_snapshots(paper_ids_hash);
112
  """
113
 
114
 
 
491
  return set()
492
  from app.config import expand_category_groups
493
  return expand_category_groups(state["selected_categories"])
494
+
495
+
496
+ # ── Phase 6.5 B3: Cluster snapshot versioning ─────────────────────────────────
497
+
498
+ async def save_cluster_snapshot(user_id: str, clusters: list[dict]) -> str:
499
+ """Append a new snapshot of the user's clusters. Returns snapshot_id.
500
+
501
+ This is purely additive history β€” current-state queries still hit
502
+ user_clusters. Retrospective queries hit cluster_snapshots.
503
+
504
+ Each cluster dict must have: cluster_idx, medoid_paper_id, importance,
505
+ paper_ids (list[str] or JSON string), optionally medoid_embedding_blob.
506
+ """
507
+ snapshot_id = str(_uuid.uuid4())
508
+ async with aiosqlite.connect(DB_PATH) as conn:
509
+ for c in clusters:
510
+ paper_ids = c["paper_ids"]
511
+ if isinstance(paper_ids, str):
512
+ paper_ids = json.loads(paper_ids)
513
+ paper_ids_hash = hashlib.sha256(
514
+ json.dumps(sorted(paper_ids)).encode()
515
+ ).hexdigest()[:16]
516
+ await conn.execute(
517
+ """INSERT INTO cluster_snapshots
518
+ (user_id, snapshot_id, cluster_idx, medoid_paper_id,
519
+ importance, paper_ids, medoid_embedding_blob, paper_ids_hash)
520
+ VALUES (?, ?, ?, ?, ?, ?, ?, ?)""",
521
+ (user_id, snapshot_id, c["cluster_idx"], c["medoid_paper_id"],
522
+ c["importance"], json.dumps(paper_ids),
523
+ c.get("medoid_embedding_blob"), paper_ids_hash),
524
+ )
525
+ await conn.commit()
526
+ return snapshot_id
527
+
528
+
529
+ async def prune_old_snapshots(retention_days: int = 30) -> int:
530
+ """Delete cluster snapshots older than retention_days. Returns rows deleted."""
531
+ async with aiosqlite.connect(DB_PATH) as conn:
532
+ cur = await conn.execute(
533
+ "DELETE FROM cluster_snapshots WHERE snapshot_date < datetime('now', ?)",
534
+ (f"-{retention_days} days",),
535
+ )
536
+ await conn.commit()
537
+ return cur.rowcount
app/main.py CHANGED
@@ -33,6 +33,13 @@ async def lifespan(app: FastAPI):
33
  print("[main] BGE-M3 model loaded β€” hybrid search ready")
34
  except Exception as e:
35
  print(f"[main] BGE-M3 not loaded ({e}) β€” search will fall back to arXiv API")
 
 
 
 
 
 
 
36
  yield
37
 
38
 
 
33
  print("[main] BGE-M3 model loaded β€” hybrid search ready")
34
  except Exception as e:
35
  print(f"[main] BGE-M3 not loaded ({e}) β€” search will fall back to arXiv API")
36
+ # Phase 6.5 B3: Prune old cluster snapshots (>30 days)
37
+ try:
38
+ pruned = await db.prune_old_snapshots(retention_days=30)
39
+ if pruned:
40
+ print(f"[main] Pruned {pruned} old cluster snapshot rows")
41
+ except Exception as e:
42
+ print(f"[main] Snapshot pruning skipped: {e}")
43
  yield
44
 
45
 
app/routers/recommendations.py CHANGED
@@ -268,6 +268,22 @@ async def _multi_interest_recommend(
268
 
269
  await save_clusters_to_db(user_id, clusters)
270
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
271
  # ── Step 2: Quota allocation ───────────────────────────────────────
272
  importances = [c.importance for c in clusters]
273
  quotas = allocate_quotas(importances, total_slots=100, min_slots=3)
 
268
 
269
  await save_clusters_to_db(user_id, clusters)
270
 
271
+ # Phase 6.5 B3: append snapshot for cluster history (non-blocking)
272
+ try:
273
+ import numpy as _np
274
+ await db.save_cluster_snapshot(user_id, [
275
+ {
276
+ "cluster_idx": c.cluster_idx,
277
+ "medoid_paper_id": c.medoid_paper_id,
278
+ "importance": c.importance,
279
+ "paper_ids": c.paper_ids,
280
+ "medoid_embedding_blob": c.medoid_embedding.astype(_np.float32).tobytes(),
281
+ }
282
+ for c in clusters
283
+ ])
284
+ except Exception as e:
285
+ print(f"[recommendations] cluster snapshot save failed (non-fatal): {e}")
286
+
287
  # ── Step 2: Quota allocation ───────────────────────────────────────
288
  importances = [c.importance for c in clusters]
289
  quotas = allocate_quotas(importances, total_slots=100, min_slots=3)