nice-bill's picture
Readme updated
2d773b1
from contextlib import asynccontextmanager
from pathlib import Path
import logging
from sentence_transformers import SentenceTransformer
from prometheus_fastapi_instrumentator import Instrumentator
from fastapi import FastAPI, HTTPException
from pydantic import BaseModel
from typing import List
import pandas as pd
import faiss
import numpy as np
import time
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)
DATA_DIR = Path("data")
CATALOG_PATH = DATA_DIR / "catalog" / "books_catalog.csv"
EMBEDDINGS_PATH = DATA_DIR / "embeddings_cache.npy"
MODEL_NAME = "all-MiniLM-L6-v2"
state = {
"titles": [],
"title_to_idx": {},
"index": None,
"embeddings": None,
"ratings": [],
"genres": [],
"model": None,
"popular_indices": []
}
@asynccontextmanager
async def lifespan(app: FastAPI):
logger.info("Loading resources...")
start_time = time.time()
if not CATALOG_PATH.exists() or not EMBEDDINGS_PATH.exists():
logger.error("Missing catalog or embeddings! Run scripts/1b... first.")
else:
try:
df = pd.read_csv(CATALOG_PATH)
state["titles"] = df['title'].tolist()
state["genres"] = df['genres'].fillna("").tolist()
raw_ratings = pd.to_numeric(df['rating'], errors='coerce').fillna(3.0)
max_rating = raw_ratings.max()
state["ratings"] = (raw_ratings / max_rating).tolist() if max_rating > 0 else [0.5] * len(df)
state["title_to_idx"] = {t.lower().strip(): i for i, t in enumerate(state["titles"])}
state["popular_indices"] = np.argsort(raw_ratings)[::-1][:50].tolist()
logger.info("Loading embeddings...")
embeddings = np.load(EMBEDDINGS_PATH)
state["embeddings"] = embeddings
OPTIMIZED_INDEX_PATH = DATA_DIR / "index" / "optimized.index"
if OPTIMIZED_INDEX_PATH.exists():
logger.info("Loading OPTIMIZED FAISS index (IVF-PQ)...")
state["index"] = faiss.read_index(str(OPTIMIZED_INDEX_PATH))
state["index"].nprobe = 10
else:
logger.info("Building Standard FAISS index (Flat)...")
d = embeddings.shape[1]
index = faiss.IndexFlatIP(d)
faiss.normalize_L2(embeddings)
index.add(embeddings)
state["index"] = index
logger.info(f"Loading Semantic Model ({MODEL_NAME})...")
state["model"] = SentenceTransformer(MODEL_NAME)
logger.info(f"Ready! Loaded {len(state['titles'])} books in {time.time() - start_time:.2f}s")
except Exception as e:
logger.error(f"Failed to load resources: {e}")
yield
logger.info("Shutting down...")
app = FastAPI(title="Semantic Book Discovery Engine", lifespan=lifespan)
from fastapi.responses import RedirectResponse
@app.get("/")
async def read_root():
return RedirectResponse(url="/docs")
Instrumentator().instrument(app).expose(app)
class RecommendationRequest(BaseModel):
user_history: List[str]
top_k: int = 10
class SearchRequest(BaseModel):
query: str
top_k: int = 10
class BookResponse(BaseModel):
title: str
score: float
genres: str
@app.post("/search", response_model=List[BookResponse])
async def search(request: SearchRequest):
if state["model"] is None or state["index"] is None:
raise HTTPException(status_code=503, detail="Service loading...")
query_vector = state["model"].encode([request.query], convert_to_numpy=True)
faiss.normalize_L2(query_vector)
scores, indices = state["index"].search(query_vector, request.top_k)
results = []
for score, idx in zip(scores[0], indices[0]):
results.append(BookResponse(
title=state["titles"][idx],
score=float(score),
genres=str(state["genres"][idx])
))
return results
@app.post("/personalize/recommend", response_model=List[BookResponse])
async def recommend(request: RecommendationRequest):
if state["index"] is None:
raise HTTPException(status_code=503, detail="Service not ready")
valid_indices = []
for title in request.user_history:
normalized_title = title.lower().strip()
if normalized_title in state["title_to_idx"]:
valid_indices.append(state["title_to_idx"][normalized_title])
if not valid_indices:
logger.info("Cold start user: returning popular books")
results = []
for idx in state["popular_indices"][:request.top_k]:
results.append(BookResponse(
title=state["titles"][idx],
score=state["ratings"][idx],
genres=str(state["genres"][idx])
))
return results
history_vectors = state["embeddings"][valid_indices]
n = len(valid_indices)
decay_factor = 0.9
weights = np.array([decay_factor ** (n - 1 - i) for i in range(n)])
weights = weights / weights.sum()
user_vector = np.average(history_vectors, axis=0, weights=weights).reshape(1, -1).astype(np.float32)
faiss.normalize_L2(user_vector)
search_k = (request.top_k * 3) + len(valid_indices)
scores, indices = state["index"].search(user_vector, search_k)
results = []
seen_indices = set(valid_indices)
seen_titles = set()
for score, idx in zip(scores[0], indices[0]):
if idx in seen_indices: continue
title = state["titles"][idx]
if title in seen_titles: continue
seen_titles.add(title)
final_score = float(score) + (state["ratings"][idx] * 0.1)
results.append(BookResponse(
title=title,
score=final_score,
genres=str(state["genres"][idx])
))
if len(results) >= request.top_k:
break
results.sort(key=lambda x: x.score, reverse=True)
return results