File size: 6,440 Bytes
67367c9
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
import os
import json
import logging
from typing import List, Dict, Any
import numpy as np

logger = logging.getLogger("plutus.recommender")
logging.basicConfig(level=logging.INFO)


_EMB_MODEL_NAME = os.getenv("EMB_MODEL_NAME", "sentence-transformers/all-MiniLM-L6-v2")


_CACHE_DIR = os.getenv("HF_HOME", "/home/user/app")
_INDEX_FILE = os.path.join(_CACHE_DIR, "plutus_recommend_index.faiss")
_META_FILE = os.path.join(_CACHE_DIR, "plutus_recommend_meta.json")

try:
    from sentence_transformers import SentenceTransformer
    import faiss
except Exception:
    logger.warning(" sentence-transformers or faiss not installed. Ensure both are in requirements.txt")



class Recommender:
    """
    Embedding-based semantic recommender for Plutus topics.
    Loads resources from recommend.json, builds a FAISS index for fast similarity search.
    """

    def __init__(
        self,
        recommend_json_path: str,
        emb_model_name: str = _EMB_MODEL_NAME,
        index_path: str = _INDEX_FILE,
        meta_path: str = _META_FILE,
    ):
        self.recommend_json_path = recommend_json_path
        self.emb_model_name = emb_model_name
        self.index_path = index_path
        self.meta_path = meta_path
        self.model = None
        self.index = None
        self.meta: List[Dict[str, Any]] = []
        self.topics_map: Dict[str, Any] = {}

      
        self._load_json()

        
        self._maybe_init_embedding_model()

        
        if os.path.exists(self.index_path) and os.path.exists(self.meta_path):
            try:
                self._load_index()
            except Exception:
                logger.exception("Index load failed — will rebuild on demand.")
        else:
            logger.info("No index found — will build when first used.")


    def _load_json(self):
        """Load recommend.json file."""
        if not os.path.exists(self.recommend_json_path):
            raise FileNotFoundError(f"recommend.json not found at: {self.recommend_json_path}")
        with open(self.recommend_json_path, "r", encoding="utf-8") as f:
            self.topics_map = json.load(f)
        logger.info(f"Loaded recommend.json with {len(self.topics_map)} topics.")

    
    def _maybe_init_embedding_model(self):
        if self.model is None:
            try:
                from sentence_transformers import SentenceTransformer
                self.model = SentenceTransformer(self.emb_model_name)
                logger.info(f"Loaded embedding model: {self.emb_model_name}")
            except Exception as e:
                logger.exception(f" Failed to load embedding model: {e}")
                raise RuntimeError("Embedding model not available. Please check dependencies.")

  
    def build_index(self, force: bool = False):
        """
        Builds FAISS index from recommend.json.
        Each document and video becomes a searchable vector.
        Automatically saves the index and metadata to disk.
        """
        if self.index is not None and not force:
            logger.info("Index already built; skipping rebuild.")
            return

        items = []
        texts = []

        
        for topic, val in self.topics_map.items():
            for d in val.get("docs", []):
                items.append({"topic": topic, "type": "doc", "url": d})
                texts.append(f"{topic} doc {d}")
            for v in val.get("videos", []):
                items.append({"topic": topic, "type": "video", "url": v})
                texts.append(f"{topic} video {v}")

        if not texts:
            raise ValueError("No docs/videos found in recommend.json to index.")

        logger.info(f"Encoding {len(texts)} recommendation entries...")

        emb = self.model.encode(texts, convert_to_numpy=True, show_progress_bar=False)
        faiss.normalize_L2(emb)
        d = emb.shape[1]

        try:
            index = faiss.IndexFlatIP(d)
            index.add(emb)
            self.index = index
            self.meta = items
            
            try:
                faiss.write_index(self.index, self.index_path)
                with open(self.meta_path, "w", encoding="utf-8") as f:
                    json.dump(self.meta, f, ensure_ascii=False, indent=2)
                logger.info(f"Saved FAISS index and metadata ({len(items)} items).")
            except Exception:
                logger.warning(" Could not persist index — running in memory only (likely Hugging Face Space).")
        except Exception as e:
            logger.exception(f" Failed to build FAISS index: {e}")
            raise RuntimeError(f"Index build failed: {e}")

    
    def _load_index(self):
        """Loads index and metadata files."""
        import faiss
        self.index = faiss.read_index(self.index_path)
        with open(self.meta_path, "r", encoding="utf-8") as f:
            self.meta = json.load(f)
        logger.info(f"Loaded FAISS index with {len(self.meta)} entries.")

    
    def recommend_for_query(self, query: str, top_k: int = 5, topic_boost: str = None) -> List[Dict[str, Any]]:
        """
        Returns top_k recommended items for `query`.
        Uses cosine similarity (via normalized inner product).
        """
        if self.index is None:
            logger.info("Index not found in memory — building now.")
            self.build_index()

        q_emb = self.model.encode([query], convert_to_numpy=True)
        import faiss
        faiss.normalize_L2(q_emb)

        D, I = self.index.search(q_emb, top_k * 3)
        results = []
        seen = set()

        for score, idx in zip(D[0], I[0]):
            if idx < 0:
                continue
            meta = self.meta[idx]
            key = (meta.get("url"), meta.get("type"))
            if key in seen:
                continue
            seen.add(key)
            results.append({
                "topic": meta.get("topic"),
                "type": meta.get("type"),
                "url": meta.get("url"),
                "score": float(score),
            })
            if len(results) >= top_k:
                break

        
        if topic_boost:
            results.sort(
                key=lambda x: (0 if x["topic"].lower() == topic_boost.lower() else 1, -x["score"])
            )
        else:
            results.sort(key=lambda x: -x["score"])

        logger.info(f"Recommended {len(results)} items for query: '{query}'")
        return results