| | from contextlib import asynccontextmanager |
| | from pathlib import Path |
| | import logging |
| | from sentence_transformers import SentenceTransformer |
| | from prometheus_fastapi_instrumentator import Instrumentator |
| | from fastapi import FastAPI, HTTPException |
| | from pydantic import BaseModel |
| | from typing import List |
| | import pandas as pd |
| | import faiss |
| | import numpy as np |
| | import time |
| |
|
| | logging.basicConfig(level=logging.INFO) |
| | logger = logging.getLogger(__name__) |
| |
|
| | DATA_DIR = Path("data") |
| | CATALOG_PATH = DATA_DIR / "catalog" / "books_catalog.csv" |
| | EMBEDDINGS_PATH = DATA_DIR / "embeddings_cache.npy" |
| | MODEL_NAME = "all-MiniLM-L6-v2" |
| |
|
| | state = { |
| | "titles": [], |
| | "title_to_idx": {}, |
| | "index": None, |
| | "embeddings": None, |
| | "ratings": [], |
| | "genres": [], |
| | "model": None, |
| | "popular_indices": [] |
| | } |
| |
|
| | @asynccontextmanager |
| | async def lifespan(app: FastAPI): |
| | logger.info("Loading resources...") |
| | start_time = time.time() |
| | |
| | if not CATALOG_PATH.exists() or not EMBEDDINGS_PATH.exists(): |
| | logger.error("Missing catalog or embeddings! Run scripts/1b... first.") |
| | else: |
| | try: |
| | df = pd.read_csv(CATALOG_PATH) |
| | state["titles"] = df['title'].tolist() |
| | state["genres"] = df['genres'].fillna("").tolist() |
| | |
| | raw_ratings = pd.to_numeric(df['rating'], errors='coerce').fillna(3.0) |
| | max_rating = raw_ratings.max() |
| | state["ratings"] = (raw_ratings / max_rating).tolist() if max_rating > 0 else [0.5] * len(df) |
| | |
| | state["title_to_idx"] = {t.lower().strip(): i for i, t in enumerate(state["titles"])} |
| | |
| | state["popular_indices"] = np.argsort(raw_ratings)[::-1][:50].tolist() |
| | |
| | logger.info("Loading embeddings...") |
| | embeddings = np.load(EMBEDDINGS_PATH) |
| | state["embeddings"] = embeddings |
| | |
| | OPTIMIZED_INDEX_PATH = DATA_DIR / "index" / "optimized.index" |
| | |
| | if OPTIMIZED_INDEX_PATH.exists(): |
| | logger.info("Loading OPTIMIZED FAISS index (IVF-PQ)...") |
| | state["index"] = faiss.read_index(str(OPTIMIZED_INDEX_PATH)) |
| | state["index"].nprobe = 10 |
| | else: |
| | logger.info("Building Standard FAISS index (Flat)...") |
| | d = embeddings.shape[1] |
| | index = faiss.IndexFlatIP(d) |
| | faiss.normalize_L2(embeddings) |
| | index.add(embeddings) |
| | state["index"] = index |
| | |
| | logger.info(f"Loading Semantic Model ({MODEL_NAME})...") |
| | state["model"] = SentenceTransformer(MODEL_NAME) |
| | |
| | logger.info(f"Ready! Loaded {len(state['titles'])} books in {time.time() - start_time:.2f}s") |
| | except Exception as e: |
| | logger.error(f"Failed to load resources: {e}") |
| | |
| | yield |
| | |
| | logger.info("Shutting down...") |
| |
|
| | app = FastAPI(title="Semantic Book Discovery Engine", lifespan=lifespan) |
| |
|
| | from fastapi.responses import RedirectResponse |
| |
|
| | @app.get("/") |
| | async def read_root(): |
| | return RedirectResponse(url="/docs") |
| |
|
| | Instrumentator().instrument(app).expose(app) |
| |
|
| | class RecommendationRequest(BaseModel): |
| | user_history: List[str] |
| | top_k: int = 10 |
| |
|
| | class SearchRequest(BaseModel): |
| | query: str |
| | top_k: int = 10 |
| |
|
| | class BookResponse(BaseModel): |
| | title: str |
| | score: float |
| | genres: str |
| |
|
| | @app.post("/search", response_model=List[BookResponse]) |
| | async def search(request: SearchRequest): |
| | if state["model"] is None or state["index"] is None: |
| | raise HTTPException(status_code=503, detail="Service loading...") |
| | |
| | query_vector = state["model"].encode([request.query], convert_to_numpy=True) |
| | faiss.normalize_L2(query_vector) |
| | |
| | scores, indices = state["index"].search(query_vector, request.top_k) |
| | |
| | results = [] |
| | for score, idx in zip(scores[0], indices[0]): |
| | results.append(BookResponse( |
| | title=state["titles"][idx], |
| | score=float(score), |
| | genres=str(state["genres"][idx]) |
| | )) |
| | |
| | return results |
| |
|
| | @app.post("/personalize/recommend", response_model=List[BookResponse]) |
| | async def recommend(request: RecommendationRequest): |
| | if state["index"] is None: |
| | raise HTTPException(status_code=503, detail="Service not ready") |
| | |
| | valid_indices = [] |
| | for title in request.user_history: |
| | normalized_title = title.lower().strip() |
| | if normalized_title in state["title_to_idx"]: |
| | valid_indices.append(state["title_to_idx"][normalized_title]) |
| | |
| | if not valid_indices: |
| | logger.info("Cold start user: returning popular books") |
| | results = [] |
| | for idx in state["popular_indices"][:request.top_k]: |
| | results.append(BookResponse( |
| | title=state["titles"][idx], |
| | score=state["ratings"][idx], |
| | genres=str(state["genres"][idx]) |
| | )) |
| | return results |
| | |
| | history_vectors = state["embeddings"][valid_indices] |
| | |
| | n = len(valid_indices) |
| | decay_factor = 0.9 |
| | weights = np.array([decay_factor ** (n - 1 - i) for i in range(n)]) |
| | weights = weights / weights.sum() |
| | |
| | user_vector = np.average(history_vectors, axis=0, weights=weights).reshape(1, -1).astype(np.float32) |
| | faiss.normalize_L2(user_vector) |
| | |
| | search_k = (request.top_k * 3) + len(valid_indices) |
| | scores, indices = state["index"].search(user_vector, search_k) |
| | |
| | results = [] |
| | seen_indices = set(valid_indices) |
| | seen_titles = set() |
| | |
| | for score, idx in zip(scores[0], indices[0]): |
| | if idx in seen_indices: continue |
| | title = state["titles"][idx] |
| | if title in seen_titles: continue |
| | seen_titles.add(title) |
| | |
| | final_score = float(score) + (state["ratings"][idx] * 0.1) |
| | |
| | results.append(BookResponse( |
| | title=title, |
| | score=final_score, |
| | genres=str(state["genres"][idx]) |
| | )) |
| | |
| | if len(results) >= request.top_k: |
| | break |
| | |
| | results.sort(key=lambda x: x.score, reverse=True) |
| | |
| | return results |