Spaces:
Sleeping
Sleeping
File size: 6,147 Bytes
7964128 aefb5af 7964128 | 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 | from contextlib import asynccontextmanager
from pathlib import Path
import logging
from sentence_transformers import SentenceTransformer
from prometheus_fastapi_instrumentator import Instrumentator
from fastapi import FastAPI, HTTPException
from pydantic import BaseModel
from typing import List
import pandas as pd
import faiss
import numpy as np
import time
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)
DATA_DIR = Path("data")
CATALOG_PATH = DATA_DIR / "catalog" / "books_catalog.csv"
EMBEDDINGS_PATH = DATA_DIR / "embeddings_cache.npy"
MODEL_NAME = "all-MiniLM-L6-v2"
state = {
"titles": [],
"title_to_idx": {},
"index": None,
"embeddings": None,
"ratings": [],
"genres": [],
"model": None,
"popular_indices": []
}
@asynccontextmanager
async def lifespan(app: FastAPI):
logger.info("Loading resources...")
start_time = time.time()
if not CATALOG_PATH.exists() or not EMBEDDINGS_PATH.exists():
logger.error("Missing catalog or embeddings! Run scripts/1b... first.")
else:
try:
df = pd.read_csv(CATALOG_PATH)
state["titles"] = df['title'].tolist()
state["genres"] = df['genres'].fillna("").tolist()
raw_ratings = pd.to_numeric(df['rating'], errors='coerce').fillna(3.0)
max_rating = raw_ratings.max()
state["ratings"] = (raw_ratings / max_rating).tolist() if max_rating > 0 else [0.5] * len(df)
state["title_to_idx"] = {t.lower().strip(): i for i, t in enumerate(state["titles"])}
state["popular_indices"] = np.argsort(raw_ratings)[::-1][:50].tolist()
logger.info("Loading embeddings...")
embeddings = np.load(EMBEDDINGS_PATH)
state["embeddings"] = embeddings
OPTIMIZED_INDEX_PATH = DATA_DIR / "index" / "optimized.index"
if OPTIMIZED_INDEX_PATH.exists():
logger.info("Loading OPTIMIZED FAISS index (IVF-PQ)...")
state["index"] = faiss.read_index(str(OPTIMIZED_INDEX_PATH))
state["index"].nprobe = 10
else:
logger.info("Building Standard FAISS index (Flat)...")
d = embeddings.shape[1]
index = faiss.IndexFlatIP(d)
faiss.normalize_L2(embeddings)
index.add(embeddings)
state["index"] = index
logger.info(f"Loading Semantic Model ({MODEL_NAME})...")
state["model"] = SentenceTransformer(MODEL_NAME)
logger.info(f"Ready! Loaded {len(state['titles'])} books in {time.time() - start_time:.2f}s")
except Exception as e:
logger.error(f"Failed to load resources: {e}")
yield
logger.info("Shutting down...")
app = FastAPI(title="Semantic Book Discovery Engine", lifespan=lifespan)
from fastapi.responses import RedirectResponse
@app.get("/")
async def read_root():
return RedirectResponse(url="/docs")
Instrumentator().instrument(app).expose(app)
class RecommendationRequest(BaseModel):
user_history: List[str]
top_k: int = 10
class SearchRequest(BaseModel):
query: str
top_k: int = 10
class BookResponse(BaseModel):
title: str
score: float
genres: str
@app.post("/search", response_model=List[BookResponse])
async def search(request: SearchRequest):
if state["model"] is None or state["index"] is None:
raise HTTPException(status_code=503, detail="Service loading...")
query_vector = state["model"].encode([request.query], convert_to_numpy=True)
faiss.normalize_L2(query_vector)
scores, indices = state["index"].search(query_vector, request.top_k)
results = []
for score, idx in zip(scores[0], indices[0]):
results.append(BookResponse(
title=state["titles"][idx],
score=float(score),
genres=str(state["genres"][idx])
))
return results
@app.post("/personalize/recommend", response_model=List[BookResponse])
async def recommend(request: RecommendationRequest):
if state["index"] is None:
raise HTTPException(status_code=503, detail="Service not ready")
valid_indices = []
for title in request.user_history:
normalized_title = title.lower().strip()
if normalized_title in state["title_to_idx"]:
valid_indices.append(state["title_to_idx"][normalized_title])
if not valid_indices:
logger.info("Cold start user: returning popular books")
results = []
for idx in state["popular_indices"][:request.top_k]:
results.append(BookResponse(
title=state["titles"][idx],
score=state["ratings"][idx],
genres=str(state["genres"][idx])
))
return results
history_vectors = state["embeddings"][valid_indices]
n = len(valid_indices)
decay_factor = 0.9
weights = np.array([decay_factor ** (n - 1 - i) for i in range(n)])
weights = weights / weights.sum()
user_vector = np.average(history_vectors, axis=0, weights=weights).reshape(1, -1).astype(np.float32)
faiss.normalize_L2(user_vector)
search_k = (request.top_k * 3) + len(valid_indices)
scores, indices = state["index"].search(user_vector, search_k)
results = []
seen_indices = set(valid_indices)
seen_titles = set()
for score, idx in zip(scores[0], indices[0]):
if idx in seen_indices: continue
title = state["titles"][idx]
if title in seen_titles: continue
seen_titles.add(title)
final_score = float(score) + (state["ratings"][idx] * 0.1)
results.append(BookResponse(
title=title,
score=final_score,
genres=str(state["genres"][idx])
))
if len(results) >= request.top_k:
break
results.sort(key=lambda x: x.score, reverse=True)
return results |