Spaces:
Running
Running
| """ | |
| src/services/clustering.py — Phase 3: HDBSCAN face clustering (People View) | |
| Clusters all face vectors in the faces-arcface Pinecone index using HDBSCAN, | |
| then stores cluster assignments in Supabase (face_clusters table). | |
| Algorithm choice: | |
| - HDBSCAN on ArcFace 512-d vectors (euclidean after L2 normalisation) | |
| - min_cluster_size=3, min_samples=3, cluster_selection_epsilon=0.35 | |
| - Noise points (label=-1) are left unclustered — not forced into clusters | |
| - Representative face = the vector closest to the cluster centroid | |
| Pinecone fetch strategy: | |
| - Pinecone free tier has no "list all vectors" endpoint | |
| - We use a dummy query with random vectors + large top_k to page through | |
| vectors. This is imperfect but works within free-tier constraints. | |
| - Production alternative: store vector_ids in Supabase on upload (Phase 4) | |
| Entry points: | |
| run_clustering(pc, user_id, keys) — full re-cluster, called by API endpoint | |
| get_people(user_id) — read cluster list from Supabase | |
| get_person_images(cluster_id, user_id) — images for one cluster | |
| rename_cluster(cluster_id, name, user_id) — label "Mom", "John", etc. | |
| """ | |
| import asyncio | |
| import uuid | |
| from datetime import datetime, timezone | |
| from typing import Optional | |
| import aiohttp | |
| import numpy as np | |
| from src.core.config import ( | |
| IDX_FACES_ARCFACE, | |
| SUPABASE_URL, SUPABASE_SERVICE_KEY, | |
| CLUSTER_MIN_SAMPLES, CLUSTER_MIN_CLUSTER_SIZE, CLUSTER_EPSILON, | |
| FACE_SEARCH_TOP_K, CLUSTERING_BLUR_THRESHOLD, | |
| ) | |
| from src.common.utils import cld_face_thumb_url | |
| # ────────────────────────────────────────────────────────────── | |
| # Supabase helpers | |
| # ────────────────────────────────────────────────────────────── | |
| def _hdr() -> dict: | |
| return { | |
| "apikey": SUPABASE_SERVICE_KEY, | |
| "Authorization": f"Bearer {SUPABASE_SERVICE_KEY}", | |
| "Content-Type": "application/json", | |
| "Prefer": "return=representation", | |
| } | |
| async def _supa_upsert(table: str, rows: list[dict]) -> None: | |
| if not SUPABASE_URL or not rows: | |
| return | |
| url = f"{SUPABASE_URL}/rest/v1/{table}" | |
| headers = {**_hdr(), "Prefer": "resolution=merge-duplicates,return=minimal"} | |
| async with aiohttp.ClientSession() as s: | |
| await s.post(url, headers=headers, json=rows) | |
| async def _supa_select(table: str, filters: str = "") -> list[dict]: | |
| if not SUPABASE_URL: | |
| return [] | |
| url = f"{SUPABASE_URL}/rest/v1/{table}?{filters}" | |
| async with aiohttp.ClientSession() as s: | |
| async with s.get(url, headers=_hdr()) as r: | |
| if r.status == 200: | |
| return await r.json() | |
| return [] | |
| async def _supa_patch(table: str, filters: str, patch: dict) -> None: | |
| if not SUPABASE_URL: | |
| return | |
| url = f"{SUPABASE_URL}/rest/v1/{table}?{filters}" | |
| async with aiohttp.ClientSession() as s: | |
| await s.patch(url, headers=_hdr(), json=patch) | |
| async def _supa_delete(table: str, filters: str) -> None: | |
| if not SUPABASE_URL: | |
| return | |
| url = f"{SUPABASE_URL}/rest/v1/{table}?{filters}" | |
| async with aiohttp.ClientSession() as s: | |
| await s.delete(url, headers=_hdr()) | |
| # ────────────────────────────────────────────────────────────── | |
| # Pinecone vector fetch helpers | |
| # ────────────────────────────────────────────────────────────── | |
| def _fetch_all_vectors(idx, dim: int = 512, max_vectors: int = 10000) -> list[dict]: | |
| """ | |
| Fetches as many vectors as possible from a Pinecone index using | |
| random-probe queries. Free-tier Pinecone has no scan endpoint, so | |
| we use diverse random probes to discover vectors. | |
| Returns list of dicts: {id, values, metadata} | |
| """ | |
| seen_ids: set = set() | |
| collected: list[dict] = [] | |
| rng = np.random.default_rng(seed=42) | |
| # 20 random probes — covers most of the index for typical gallery sizes | |
| for _ in range(20): | |
| probe = rng.standard_normal(dim).astype(np.float32) | |
| probe /= np.linalg.norm(probe) | |
| res = idx.query( | |
| vector=probe.tolist(), | |
| top_k=min(FACE_SEARCH_TOP_K, 1000), | |
| include_metadata=True, | |
| include_values=True, | |
| ) | |
| for match in res.get("matches", []): | |
| vid = match["id"] | |
| if vid in seen_ids: | |
| continue | |
| seen_ids.add(vid) | |
| values = match.get("values") | |
| if values: | |
| collected.append({ | |
| "id": vid, | |
| "values": values, | |
| "metadata": match.get("metadata", {}), | |
| }) | |
| if len(collected) >= max_vectors: | |
| break | |
| if len(collected) >= max_vectors: | |
| break | |
| return collected | |
| # ────────────────────────────────────────────────────────────── | |
| # Core clustering logic | |
| # ────────────────────────────────────────────────────────────── | |
| def _run_hdbscan(vectors: np.ndarray) -> np.ndarray: | |
| """ | |
| Runs HDBSCAN on the provided L2-normalised 512-d face vectors. | |
| Returns integer label array (−1 = noise / unclustered). | |
| """ | |
| try: | |
| import hdbscan | |
| except ImportError: | |
| raise RuntimeError( | |
| "hdbscan not installed. Add hdbscan>=0.8.33 to requirements.txt" | |
| ) | |
| clusterer = hdbscan.HDBSCAN( | |
| min_cluster_size=CLUSTER_MIN_CLUSTER_SIZE, | |
| min_samples=CLUSTER_MIN_SAMPLES, | |
| cluster_selection_epsilon=CLUSTER_EPSILON, | |
| metric="euclidean", | |
| core_dist_n_jobs=1, # HF CPU — avoid multiprocessing overhead | |
| ) | |
| clusterer.fit(vectors) | |
| return clusterer.labels_ | |
| def _pick_representative(cluster_vecs: np.ndarray, cluster_meta: list[dict]) -> dict: | |
| """ | |
| Picks the non-blurry face closest to the cluster centroid as the representative. | |
| Prefers sharpest (highest blur_score) faces. Returns the metadata dict for that face. | |
| """ | |
| centroid = cluster_vecs.mean(axis=0) | |
| centroid /= np.linalg.norm(centroid) + 1e-8 | |
| sims = cluster_vecs @ centroid | |
| # Sort by similarity, but prefer non-blurry faces (higher blur_score) | |
| sorted_indices = np.argsort(sims)[::-1] # highest similarity first | |
| for idx in sorted_indices: | |
| blur_score = cluster_meta[idx].get("blur_score", 100.0) | |
| if blur_score >= CLUSTERING_BLUR_THRESHOLD: | |
| return cluster_meta[int(idx)] | |
| # Fallback: if all faces are blurry, pick the sharpest one | |
| best_idx = max(range(len(cluster_meta)), key=lambda i: cluster_meta[i].get("blur_score", 0)) | |
| return cluster_meta[best_idx] | |
| # ────────────────────────────────────────────────────────────── | |
| # Public entry points | |
| # ────────────────────────────────────────────────────────────── | |
| async def run_clustering(pc, user_id: str) -> dict: | |
| """ | |
| Full re-cluster pipeline: | |
| 1. Fetch all ArcFace vectors from Pinecone | |
| 2. Run HDBSCAN | |
| 3. Write cluster assignments to Supabase face_clusters table | |
| 4. Write per-vector assignments to face_vector_clusters table | |
| Returns a summary dict. | |
| """ | |
| idx = pc.Index(IDX_FACES_ARCFACE) | |
| # 1. Fetch vectors (blocking — run in thread pool) | |
| raw = await asyncio.to_thread(_fetch_all_vectors, idx) | |
| if len(raw) < CLUSTER_MIN_CLUSTER_SIZE: | |
| return {"status": "skipped", "reason": "not enough vectors", "vectors": len(raw)} | |
| ids = [r["id"] for r in raw] | |
| metas = [r["metadata"] for r in raw] | |
| # Filter out blurry faces before clustering | |
| valid_indices = [i for i, meta in enumerate(metas) if meta.get("blur_score", 100.0) >= CLUSTERING_BLUR_THRESHOLD] | |
| if len(valid_indices) < CLUSTER_MIN_CLUSTER_SIZE: | |
| return {"status": "skipped", "reason": f"only {len(valid_indices)} non-blurry vectors after blur filtering", "vectors": len(raw), "valid_vectors": len(valid_indices)} | |
| ids = [ids[i] for i in valid_indices] | |
| metas = [metas[i] for i in valid_indices] | |
| raw_values = [r["values"] for r in raw] | |
| matrix = np.array([raw_values[i] for i in valid_indices], dtype=np.float32) | |
| # L2-normalise before euclidean HDBSCAN (equivalent to angular distance) | |
| norms = np.linalg.norm(matrix, axis=1, keepdims=True) | |
| matrix = matrix / (norms + 1e-8) | |
| # 2. Cluster (blocking) | |
| labels = await asyncio.to_thread(_run_hdbscan, matrix) | |
| unique_labels = set(labels) - {-1} | |
| now_iso = datetime.now(timezone.utc).isoformat() | |
| # 3. Delete existing clusters for this user (full re-cluster) | |
| await _supa_delete("face_clusters", f"user_id=eq.{user_id}") | |
| await _supa_delete("face_vector_clusters", f"user_id=eq.{user_id}") | |
| cluster_rows = [] | |
| vector_rows = [] | |
| for label in sorted(unique_labels): | |
| cluster_id = str(uuid.uuid4()) | |
| mask = labels == label | |
| c_indices = np.where(mask)[0] | |
| c_vecs = matrix[c_indices] | |
| c_meta = [metas[i] for i in c_indices] | |
| c_ids = [ids[i] for i in c_indices] | |
| rep_meta = _pick_representative(c_vecs, c_meta) | |
| cluster_rows.append({ | |
| "cluster_id": cluster_id, | |
| "user_id": user_id, | |
| "representative_face_crop": rep_meta.get("face_crop", ""), | |
| "representative_vector_id": c_ids[0], | |
| "face_count": int(len(c_indices)), | |
| "name": None, | |
| "created_at": now_iso, | |
| "updated_at": now_iso, | |
| }) | |
| for vid, meta in zip(c_ids, c_meta): | |
| vector_rows.append({ | |
| "vector_id": vid, | |
| "cluster_id": cluster_id, | |
| "user_id": user_id, | |
| "image_url": meta.get("url", ""), | |
| "folder": meta.get("folder", ""), | |
| "face_crop": meta.get("face_crop", ""), | |
| "updated_at": now_iso, | |
| }) | |
| # 4. Batch write to Supabase (200 rows per request) | |
| for i in range(0, len(cluster_rows), 200): | |
| await _supa_upsert("face_clusters", cluster_rows[i:i + 200]) | |
| for i in range(0, len(vector_rows), 200): | |
| await _supa_upsert("face_vector_clusters", vector_rows[i:i + 200]) | |
| return { | |
| "status": "ok", | |
| "total_vectors": len(ids), | |
| "clusters_found": len(unique_labels), | |
| "noise_vectors": int(np.sum(labels == -1)), | |
| } | |
| async def get_people(user_id: str) -> list[dict]: | |
| """Returns all identity clusters for a user, ordered by face_count desc.""" | |
| rows = await _supa_select( | |
| "face_clusters", | |
| f"user_id=eq.{user_id}&order=face_count.desc", | |
| ) | |
| return [ | |
| { | |
| "cluster_id": r["cluster_id"], | |
| "name": r.get("name"), | |
| "face_count": r.get("face_count", 0), | |
| "representative_face_crop": r.get("representative_face_crop", ""), | |
| } | |
| for r in rows | |
| ] | |
| async def get_person_images(cluster_id: str, user_id: str) -> list[dict]: | |
| """Returns all images belonging to a cluster.""" | |
| rows = await _supa_select( | |
| "face_vector_clusters", | |
| f"cluster_id=eq.{cluster_id}&user_id=eq.{user_id}", | |
| ) | |
| # Dedupe by image_url (multiple face vectors can come from the same image) | |
| seen: set = set() | |
| out = [] | |
| for r in rows: | |
| url = r.get("image_url", "") | |
| if url and url not in seen: | |
| seen.add(url) | |
| out.append({ | |
| "url": url, | |
| "thumb_url": cld_face_thumb_url(url), | |
| "folder": r.get("folder", ""), | |
| "face_crop": r.get("face_crop", ""), | |
| }) | |
| return out | |
| async def rename_cluster(cluster_id: str, name: str, user_id: str) -> bool: | |
| """Assigns a human-readable name to a cluster ('Mom', 'John', etc.).""" | |
| await _supa_patch( | |
| "face_clusters", | |
| f"cluster_id=eq.{cluster_id}&user_id=eq.{user_id}", | |
| {"name": name, "updated_at": datetime.now(timezone.utc).isoformat()}, | |
| ) | |
| return True | |
| async def search_cluster_aware( | |
| pc, image_map: dict, user_id: str | |
| ) -> dict: | |
| """ | |
| Cluster-aware search expansion (Phase 3 recall win). | |
| Given an initial image_map from search_faces_split, look up which | |
| clusters the matched faces belong to, then return ALL images in those | |
| clusters. This achieves near-100% recall for well-indexed people. | |
| Returns an expanded image_map in the same format as search_faces_split. | |
| """ | |
| if not image_map: | |
| return image_map | |
| # Find which vector_ids were returned in the initial search | |
| matched_vids = {v.get("vector_id") for v in image_map.values() if v.get("vector_id")} | |
| if not matched_vids: | |
| return image_map | |
| # Look up cluster membership for those vector_ids | |
| vid_list = ",".join(f'"{v}"' for v in matched_vids) | |
| rows = await _supa_select( | |
| "face_vector_clusters", | |
| f"vector_id=in.({vid_list})&user_id=eq.{user_id}", | |
| ) | |
| if not rows: | |
| return image_map | |
| # Collect all cluster_ids matched | |
| cluster_ids = {r["cluster_id"] for r in rows} | |
| # Fetch all images in those clusters | |
| expanded = dict(image_map) | |
| for cluster_id in cluster_ids: | |
| cluster_images = await get_person_images(cluster_id, user_id) | |
| for img in cluster_images: | |
| url = img["url"] | |
| if url not in expanded: | |
| # Add with a slightly lower score than the worst match | |
| # so cluster-expanded results sort after direct hits | |
| min_score = min( | |
| (v["fused_score"] for v in image_map.values()), default=0.3 | |
| ) | |
| expanded[url] = { | |
| "fused_score": max(min_score - 0.01, 0.01), | |
| "arcface_score": 0.0, | |
| "adaface_score": 0.0, | |
| "raw_score": 0.0, | |
| "face_crop": img.get("face_crop", ""), | |
| "folder": img.get("folder", "uncategorized"), | |
| "vector_id": None, | |
| "cluster_expanded": True, | |
| } | |
| return expanded |