Spaces:
Sleeping
Sleeping
File size: 6,440 Bytes
67367c9 |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 |
import os
import json
import logging
from typing import List, Dict, Any
import numpy as np
logger = logging.getLogger("plutus.recommender")
logging.basicConfig(level=logging.INFO)
_EMB_MODEL_NAME = os.getenv("EMB_MODEL_NAME", "sentence-transformers/all-MiniLM-L6-v2")
_CACHE_DIR = os.getenv("HF_HOME", "/home/user/app")
_INDEX_FILE = os.path.join(_CACHE_DIR, "plutus_recommend_index.faiss")
_META_FILE = os.path.join(_CACHE_DIR, "plutus_recommend_meta.json")
try:
from sentence_transformers import SentenceTransformer
import faiss
except Exception:
logger.warning(" sentence-transformers or faiss not installed. Ensure both are in requirements.txt")
class Recommender:
"""
Embedding-based semantic recommender for Plutus topics.
Loads resources from recommend.json, builds a FAISS index for fast similarity search.
"""
def __init__(
self,
recommend_json_path: str,
emb_model_name: str = _EMB_MODEL_NAME,
index_path: str = _INDEX_FILE,
meta_path: str = _META_FILE,
):
self.recommend_json_path = recommend_json_path
self.emb_model_name = emb_model_name
self.index_path = index_path
self.meta_path = meta_path
self.model = None
self.index = None
self.meta: List[Dict[str, Any]] = []
self.topics_map: Dict[str, Any] = {}
self._load_json()
self._maybe_init_embedding_model()
if os.path.exists(self.index_path) and os.path.exists(self.meta_path):
try:
self._load_index()
except Exception:
logger.exception("Index load failed — will rebuild on demand.")
else:
logger.info("No index found — will build when first used.")
def _load_json(self):
"""Load recommend.json file."""
if not os.path.exists(self.recommend_json_path):
raise FileNotFoundError(f"recommend.json not found at: {self.recommend_json_path}")
with open(self.recommend_json_path, "r", encoding="utf-8") as f:
self.topics_map = json.load(f)
logger.info(f"Loaded recommend.json with {len(self.topics_map)} topics.")
def _maybe_init_embedding_model(self):
if self.model is None:
try:
from sentence_transformers import SentenceTransformer
self.model = SentenceTransformer(self.emb_model_name)
logger.info(f"Loaded embedding model: {self.emb_model_name}")
except Exception as e:
logger.exception(f" Failed to load embedding model: {e}")
raise RuntimeError("Embedding model not available. Please check dependencies.")
def build_index(self, force: bool = False):
"""
Builds FAISS index from recommend.json.
Each document and video becomes a searchable vector.
Automatically saves the index and metadata to disk.
"""
if self.index is not None and not force:
logger.info("Index already built; skipping rebuild.")
return
items = []
texts = []
for topic, val in self.topics_map.items():
for d in val.get("docs", []):
items.append({"topic": topic, "type": "doc", "url": d})
texts.append(f"{topic} doc {d}")
for v in val.get("videos", []):
items.append({"topic": topic, "type": "video", "url": v})
texts.append(f"{topic} video {v}")
if not texts:
raise ValueError("No docs/videos found in recommend.json to index.")
logger.info(f"Encoding {len(texts)} recommendation entries...")
emb = self.model.encode(texts, convert_to_numpy=True, show_progress_bar=False)
faiss.normalize_L2(emb)
d = emb.shape[1]
try:
index = faiss.IndexFlatIP(d)
index.add(emb)
self.index = index
self.meta = items
try:
faiss.write_index(self.index, self.index_path)
with open(self.meta_path, "w", encoding="utf-8") as f:
json.dump(self.meta, f, ensure_ascii=False, indent=2)
logger.info(f"Saved FAISS index and metadata ({len(items)} items).")
except Exception:
logger.warning(" Could not persist index — running in memory only (likely Hugging Face Space).")
except Exception as e:
logger.exception(f" Failed to build FAISS index: {e}")
raise RuntimeError(f"Index build failed: {e}")
def _load_index(self):
"""Loads index and metadata files."""
import faiss
self.index = faiss.read_index(self.index_path)
with open(self.meta_path, "r", encoding="utf-8") as f:
self.meta = json.load(f)
logger.info(f"Loaded FAISS index with {len(self.meta)} entries.")
def recommend_for_query(self, query: str, top_k: int = 5, topic_boost: str = None) -> List[Dict[str, Any]]:
"""
Returns top_k recommended items for `query`.
Uses cosine similarity (via normalized inner product).
"""
if self.index is None:
logger.info("Index not found in memory — building now.")
self.build_index()
q_emb = self.model.encode([query], convert_to_numpy=True)
import faiss
faiss.normalize_L2(q_emb)
D, I = self.index.search(q_emb, top_k * 3)
results = []
seen = set()
for score, idx in zip(D[0], I[0]):
if idx < 0:
continue
meta = self.meta[idx]
key = (meta.get("url"), meta.get("type"))
if key in seen:
continue
seen.add(key)
results.append({
"topic": meta.get("topic"),
"type": meta.get("type"),
"url": meta.get("url"),
"score": float(score),
})
if len(results) >= top_k:
break
if topic_boost:
results.sort(
key=lambda x: (0 if x["topic"].lower() == topic_boost.lower() else 1, -x["score"])
)
else:
results.sort(key=lambda x: -x["score"])
logger.info(f"Recommended {len(results)} items for query: '{query}'")
return results |