""" candidate_filter.py — CPU-only candidate retrieval from a precomputed arXiv pool. The arxiv_pool.parquet (built daily by scripts/update_arxiv_pool.py) contains for each paper: arxiv_id, title, abstract, published_at, abstract_embedding (384-dim, all-MiniLM-L6-v2). Embedding the pool is the expensive step — done once per day, off the Space. Per-request work: embed the team-context query (~50ms on CPU), do a cosine top-k against the cached pool embeddings (numpy, ~5ms for 500 candidates). No GPU touched. """ from __future__ import annotations import os from functools import lru_cache from pathlib import Path import numpy as np import pandas as pd from huggingface_hub import hf_hub_download from sentence_transformers import SentenceTransformer EMBEDDER_NAME = "sentence-transformers/all-MiniLM-L6-v2" # TODO: replace with the actual HF dataset once arxiv_pool cron is wired. ARXIV_POOL_REPO = os.environ.get("ARXIV_POOL_REPO", "remyxai/arxiv_pool_daily") ARXIV_POOL_FILE = os.environ.get("ARXIV_POOL_FILE", "arxiv_pool.parquet") _embedder: SentenceTransformer | None = None _pool: pd.DataFrame | None = None _pool_embeddings: np.ndarray | None = None def load_embedder() -> SentenceTransformer: global _embedder if _embedder is None: print(f"[candidate_filter] Loading {EMBEDDER_NAME}…") # device="cpu" is required, not just intentional: SentenceTransformer's # default auto-detects CUDA and moves the model there at __init__, # which initializes CUDA in the parent process — fatal on ZeroGPU # (worker_init then fails with "No CUDA GPUs are available"). _embedder = SentenceTransformer(EMBEDDER_NAME, device="cpu") return _embedder def load_pool() -> pd.DataFrame: """Load the precomputed arxiv pool. Cached for the container lifetime.""" global _pool, _pool_embeddings if _pool is not None: return _pool hf_token = os.environ.get("HF_TOKEN") or os.environ.get("HUGGINGFACE_TOKEN") print(f"[candidate_filter] Downloading {ARXIV_POOL_REPO}/{ARXIV_POOL_FILE}…") try: path = hf_hub_download( repo_id=ARXIV_POOL_REPO, filename=ARXIV_POOL_FILE, repo_type="dataset", token=hf_token, ) _pool = pd.read_parquet(path) except Exception as e: # Fall back to local file for development local = Path(__file__).parent / "arxiv_pool.parquet" if local.exists(): print(f"[candidate_filter] Hub fetch failed ({e}); using local {local}") _pool = pd.read_parquet(local) else: raise RuntimeError( f"No arxiv pool available. Set ARXIV_POOL_REPO or place " f"arxiv_pool.parquet at {local}." ) from e # Materialize embeddings as a contiguous float32 ndarray for fast cosine embs = np.stack(_pool["abstract_embedding"].values).astype(np.float32) # L2-normalize so cosine sim == dot product norms = np.linalg.norm(embs, axis=1, keepdims=True) + 1e-12 _pool_embeddings = embs / norms print(f"[candidate_filter] Pool loaded: {len(_pool):,} papers, {embs.shape[1]}-dim embeddings") return _pool def top_k_candidates(team_ctx: str, k: int = 50) -> pd.DataFrame: """Embed `team_ctx` and return the top-k papers from the pool by cosine similarity. Pure CPU; no GPU contention.""" pool = load_pool() embedder = load_embedder() query = embedder.encode(team_ctx, normalize_embeddings=True).astype(np.float32) sims = _pool_embeddings @ query # (N,) cosine sims top_idx = np.argpartition(-sims, kth=min(k, len(sims) - 1))[:k] # Sort the top-k by similarity (argpartition isn't ordered) top_idx = top_idx[np.argsort(-sims[top_idx])] result = pool.iloc[top_idx].copy() result["similarity"] = sims[top_idx] return result.reset_index(drop=True)