Spaces:
Sleeping
Sleeping
| import logging | |
| import os | |
| import sys | |
| import time | |
| import warnings | |
| from contextlib import asynccontextmanager | |
| from typing import Dict, List | |
| # Add the project root to the Python path | |
| sys.path.insert(0, os.path.abspath(os.path.join(os.path.dirname(__file__), "../../.."))) | |
| import numpy as np | |
| import pandas as pd | |
| from fastapi import Depends, FastAPI, HTTPException, Query, Request, status | |
| from fastapi.middleware.cors import CORSMiddleware | |
| from sentence_transformers import SentenceTransformer | |
| from slowapi import _rate_limit_exceeded_handler | |
| from slowapi.errors import RateLimitExceeded | |
| from src.book_recommender.api.dependencies import ( | |
| get_clusters_data, | |
| get_recommender, | |
| get_sentence_transformer_model, | |
| limiter, | |
| ) | |
| from src.book_recommender.api.models import ( | |
| Book, | |
| BookCluster, | |
| BookSearchResult, | |
| BookStats, | |
| ExplainRecommendationRequest, | |
| ExplanationResponse, | |
| FeedbackRequest, | |
| FeedbackStatsResponse, | |
| RecommendationResult, | |
| RecommendByQueryRequest, | |
| RecommendByTitleRequest, | |
| RecommendByHistoryRequest | |
| ) | |
| from src.book_recommender.core.exceptions import DataNotFoundError | |
| from src.book_recommender.core.logging_config import configure_logging | |
| from src.book_recommender.ml.explainability import explain_recommendation | |
| from src.book_recommender.ml.feedback import get_all_feedback, save_feedback | |
| from src.book_recommender.ml.recommender import BookRecommender | |
| from src.book_recommender.utils import load_book_covers_batch | |
| from src.book_recommender.services.personalizer import PersonalizationService | |
| personalizer = PersonalizationService() | |
| warnings.filterwarnings("ignore", message="resume_download is deprecated") | |
| warnings.filterwarnings("ignore", category=FutureWarning, module="huggingface_hub") | |
| configure_logging(log_file="api.log", log_level=os.getenv("LOG_LEVEL", "INFO")) | |
| logger = logging.getLogger(__name__) | |
| IS_TESTING = os.getenv("TESTING_ENV", "False").lower() == "true" | |
| def log_exception(e: Exception): | |
| """Logs an exception with traceback if in DEBUG mode, otherwise logs a generic message.""" | |
| if logger.isEnabledFor(logging.DEBUG): | |
| logger.error(f"An unexpected error occurred: {e}", exc_info=True) | |
| else: | |
| logger.error("An unexpected error occurred.") | |
| async def lifespan(app: FastAPI): | |
| """Lifecycle manager with startup timing""" | |
| if not IS_TESTING: | |
| logger.info("=" * 30) | |
| logger.info("Starting DeepShelf API...") | |
| logger.info("=" * 30) | |
| start_time = time.time() | |
| try: | |
| t0 = time.time() | |
| get_recommender() | |
| logger.info(f"Recommender loaded in {time.time() - t0:.1f}s") | |
| t0 = time.time() | |
| get_sentence_transformer_model() | |
| logger.info(f"Model loaded in {time.time() - t0:.1f}s") | |
| t0 = time.time() | |
| get_clusters_data() | |
| logger.info(f"Clusters loaded in {time.time() - t0:.1f}s") | |
| total_time = time.time() - start_time | |
| port = os.getenv("PORT", "8000") | |
| logger.info("=" * 30) | |
| logger.info(f"API ready in {total_time:.1f}s | http://0.0.0.0:{port}") | |
| logger.info("=" * 30) | |
| except Exception as e: | |
| logger.error(f"Startup failed: {e}") | |
| raise | |
| else: | |
| logger.info("Test mode - skipping model loading") | |
| yield | |
| logger.info("Shutting down DeepShelf API...") | |
| app = FastAPI( | |
| title="DeepShelf API", | |
| description="API for content-based book recommendations and book management.", | |
| version="0.1.0", | |
| lifespan=lifespan, | |
| ) | |
| app.state.limiter = limiter | |
| app.add_exception_handler(RateLimitExceeded, lambda request, exc: _rate_limit_exceeded_handler(request, exc)) | |
| app.add_middleware( | |
| CORSMiddleware, | |
| allow_origins=["*"], | |
| allow_credentials=True, | |
| allow_methods=["*"], | |
| allow_headers=["*"], | |
| ) | |
| async def add_security_headers(request: Request, call_next): | |
| response = await call_next(request) | |
| response.headers["X-Content-Type-Options"] = "nosniff" | |
| response.headers["X-Frame-Options"] = "DENY" | |
| return response | |
| from fastapi.responses import RedirectResponse | |
| async def root(): | |
| """Redirects to the API documentation.""" | |
| return RedirectResponse(url="/docs") | |
| async def health_check(request: Request): | |
| """ | |
| Checks the health of the API and its core components. | |
| """ | |
| try: | |
| _ = get_recommender() | |
| _ = get_sentence_transformer_model() | |
| get_clusters_data() | |
| return {"status": "OK", "message": "DeepShelf API is healthy and core services are loaded."} | |
| except Exception as e: | |
| log_exception(e) | |
| raise HTTPException( | |
| status_code=status.HTTP_503_SERVICE_UNAVAILABLE, | |
| detail=f"Core services (recommender/embedding model/clusters) are not available: {e}", | |
| ) | |
| async def recommend_by_query( | |
| request: Request, | |
| body: RecommendByQueryRequest, | |
| recommender: BookRecommender = Depends(get_recommender), | |
| model: SentenceTransformer = Depends(get_sentence_transformer_model), | |
| ): | |
| """ | |
| Provides book recommendations by semantically comparing a natural language query | |
| against the book embedding database. | |
| """ | |
| try: | |
| query_embedding = model.encode(body.query, show_progress_bar=False) | |
| recommendations = recommender.get_recommendations_from_vector(query_embedding, top_k=body.top_k) | |
| # Identify books with missing covers | |
| books_needing_covers = [ | |
| rec for rec in recommendations if not rec.get("cover_image_url") | |
| ] | |
| # Fetch missing covers in batch | |
| if books_needing_covers: | |
| covers_map = load_book_covers_batch(books_needing_covers) | |
| for rec in recommendations: | |
| if not rec.get("cover_image_url"): | |
| rec["cover_image_url"] = covers_map.get(rec["title"]) | |
| results = [] | |
| for rec in recommendations: | |
| book = Book( | |
| id=str(rec["id"]), | |
| title=rec["title"], | |
| authors=(rec.get("authors", "").split(", ") if isinstance(rec.get("authors"), str) else []), | |
| description=rec.get("description"), | |
| genres=(rec.get("genres", "").split(", ") if isinstance(rec.get("genres"), str) else []), | |
| cover_image_url=rec.get("cover_image_url"), | |
| ) | |
| results.append(RecommendationResult(book=book, similarity_score=rec["similarity"])) | |
| return results | |
| except DataNotFoundError as e: | |
| raise HTTPException(status_code=status.HTTP_404_NOT_FOUND, detail=str(e)) | |
| except Exception as e: | |
| log_exception(e) | |
| raise HTTPException( | |
| status_code=status.HTTP_500_INTERNAL_SERVER_ERROR, | |
| detail="Internal server error during recommendation.", | |
| ) | |
| async def recommend_personalized( | |
| request: Request, | |
| body: RecommendByHistoryRequest, | |
| recommender: BookRecommender = Depends(get_recommender), | |
| ): | |
| """ | |
| Calls the external Personalization Engine (Port 8001) to get | |
| semantic recommendations, then hydrates the results with local book metadata (cover, authors, etc). | |
| """ | |
| try: | |
| semantic_recs = personalizer.get_recommendations(body.user_history,top_k=body.top_k) | |
| if not semantic_recs: | |
| return [] | |
| results = [] | |
| books_needing_covers = [] | |
| for rec in semantic_recs: | |
| # Try exact match first | |
| mask = recommender.book_data['title'] == rec['title'] | |
| local_book_df = recommender.book_data[mask] | |
| # Fallback to loose match | |
| if local_book_df.empty: | |
| mask = recommender.book_data['title'].str.lower().str.strip() == rec['title'].lower().strip() | |
| local_book_df = recommender.book_data[mask] | |
| if not local_book_df.empty: | |
| row = local_book_df.iloc[0] | |
| book = Book( | |
| id=str(row["id"]), | |
| title=row["title"], | |
| authors=(row.get("authors", "").split(", ") if isinstance(row.get("authors"), str) else []), | |
| description=row.get("description"), | |
| genres=(row.get("genres", "").split(", ") if isinstance(row.get("genres"), str) else []), | |
| cover_image_url=row.get("cover_image_url") | |
| ) | |
| if not book.cover_image_url: | |
| books_needing_covers.append(dict(row)) | |
| results.append(RecommendationResult(book=book, similarity_score=rec["score"])) | |
| if books_needing_covers: | |
| covers_map = load_book_covers_batch(books_needing_covers) | |
| for rec in results: | |
| if not rec.book.cover_image_url: | |
| rec.book.cover_image_url = covers_map.get(rec.book.title) | |
| return results | |
| except Exception as e: | |
| log_exception(e) | |
| raise HTTPException( | |
| status_code=status.HTTP_500_INTERNAL_SERVER_ERROR, | |
| detail="Internal server error during personalization.", | |
| ) | |
| async def recommend_by_title( | |
| request: Request, | |
| body: RecommendByTitleRequest, | |
| recommender: BookRecommender = Depends(get_recommender), | |
| ): | |
| """ | |
| Provides recommendations for books similar to a given title. | |
| """ | |
| try: | |
| recommendations = recommender.get_recommendations(body.title, top_k=body.top_k) | |
| if not recommendations: | |
| raise HTTPException( | |
| status_code=status.HTTP_404_NOT_FOUND, | |
| detail=f"Book with title '{body.title}' not found or no recommendations met the similarity threshold.", | |
| ) | |
| # Identify books with missing covers | |
| books_needing_covers = [ | |
| rec for rec in recommendations if not rec.get("cover_image_url") | |
| ] | |
| # Fetch missing covers in batch | |
| if books_needing_covers: | |
| covers_map = load_book_covers_batch(books_needing_covers) | |
| for rec in recommendations: | |
| if not rec.get("cover_image_url"): | |
| rec["cover_image_url"] = covers_map.get(rec["title"]) | |
| results = [] | |
| for rec in recommendations: | |
| book = Book( | |
| id=str(rec["id"]), | |
| title=rec["title"], | |
| authors=(rec.get("authors", "").split(", ") if isinstance(rec.get("authors"), str) else []), | |
| description=rec.get("description"), | |
| genres=(rec.get("genres", "").split(", ") if isinstance(rec.get("genres"), str) else []), | |
| cover_image_url=rec.get("cover_image_url"), | |
| ) | |
| results.append(RecommendationResult(book=book, similarity_score=rec["similarity"])) | |
| return results | |
| except DataNotFoundError as e: | |
| raise HTTPException(status_code=status.HTTP_404_NOT_FOUND, detail=str(e)) | |
| except HTTPException: | |
| raise | |
| except Exception as e: | |
| log_exception(e) | |
| raise HTTPException( | |
| status_code=status.HTTP_500_INTERNAL_SERVER_ERROR, | |
| detail="Internal server error during recommendation.", | |
| ) | |
| async def list_books( | |
| request: Request, | |
| recommender: BookRecommender = Depends(get_recommender), | |
| page: int = Query(1, ge=1, description="Page number"), | |
| page_size: int = Query(10, ge=1, le=100, description="Number of items per page"), | |
| ): | |
| """ | |
| Retrieves a paginated list of all books in the catalog. | |
| """ | |
| try: | |
| all_books_df = recommender.book_data | |
| total_books = len(all_books_df) | |
| start_index = (page - 1) * page_size | |
| end_index = start_index + page_size | |
| paginated_books_df = all_books_df.iloc[start_index:end_index] | |
| books = [] | |
| for _, rec in paginated_books_df.iterrows(): | |
| book = Book( | |
| id=str(rec["id"]), | |
| title=rec["title"], | |
| authors=(rec.get("authors", "").split(", ") if isinstance(rec.get("authors"), str) else []), | |
| description=rec.get("description"), | |
| genres=(rec.get("genres", "").split(", ") if isinstance(rec.get("genres"), str) else []), | |
| cover_image_url=rec.get("cover_image_url"), | |
| ) | |
| books.append(book) | |
| return BookSearchResult(books=books, total=total_books, page=page, page_size=page_size) | |
| except Exception as e: | |
| log_exception(e) | |
| raise HTTPException( | |
| status_code=status.HTTP_500_INTERNAL_SERVER_ERROR, | |
| detail="Internal server error while listing books.", | |
| ) | |
| async def search_books( | |
| request: Request, | |
| recommender: BookRecommender = Depends(get_recommender), | |
| query: str = Query( | |
| ..., | |
| min_length=2, | |
| max_length=255, | |
| description="Search query for title or author", | |
| ), | |
| page: int = Query(1, ge=1, description="Page number"), | |
| page_size: int = Query(10, ge=1, le=100, description="Number of items per page"), | |
| ): | |
| """ | |
| Searches for books by matching the query against book titles or author names. | |
| The search is case-insensitive. | |
| """ | |
| try: | |
| sanitized_query = query.strip() | |
| if not sanitized_query: | |
| raise HTTPException( | |
| status_code=status.HTTP_400_BAD_REQUEST, | |
| detail="Query cannot be empty or just whitespace.", | |
| ) | |
| all_books_df = recommender.book_data | |
| mask = (all_books_df["title_lower"].str.contains(sanitized_query.lower(), na=False)) | ( | |
| all_books_df["authors_lower"].str.contains(sanitized_query.lower(), na=False) | |
| ) | |
| filtered_books_df = all_books_df[mask] | |
| total_books = len(filtered_books_df) | |
| start_index = (page - 1) * page_size | |
| end_index = start_index + page_size | |
| paginated_books_df = filtered_books_df.iloc[start_index:end_index] | |
| books = [] | |
| for _, rec in paginated_books_df.iterrows(): | |
| book = Book( | |
| id=str(rec["id"]), | |
| title=rec["title"], | |
| authors=(rec.get("authors", "").split(", ") if isinstance(rec.get("authors"), str) else []), | |
| description=rec.get("description"), | |
| genres=(rec.get("genres", "").split(", ") if isinstance(rec.get("genres"), str) else []), | |
| cover_image_url=rec.get("cover_image_url"), | |
| ) | |
| books.append(book) | |
| return BookSearchResult(books=books, total=total_books, page=page, page_size=page_size) | |
| except Exception as e: | |
| log_exception(e) | |
| raise HTTPException( | |
| status_code=status.HTTP_500_INTERNAL_SERVER_ERROR, | |
| detail="Internal server error while searching books.", | |
| ) | |
| async def get_stats(request: Request, recommender: BookRecommender = Depends(get_recommender)): | |
| """ | |
| Provides various statistics about the book dataset, including total book count, | |
| genre distribution, and author distribution. | |
| """ | |
| try: | |
| all_books_df = recommender.book_data | |
| total_books = len(all_books_df) | |
| all_genres = all_books_df["genres"].str.lower().str.split(", ").explode().dropna() | |
| genres_count = all_genres.value_counts().to_dict() | |
| all_authors = all_books_df["authors"].str.lower().str.split(", ").explode().dropna() | |
| authors_count = all_authors.value_counts().to_dict() | |
| return BookStats( | |
| total_books=total_books, | |
| genres_count=genres_count, | |
| authors_count=authors_count, | |
| ) | |
| except Exception as e: | |
| log_exception(e) | |
| raise HTTPException( | |
| status_code=status.HTTP_500_INTERNAL_SERVER_ERROR, | |
| detail="Internal server error while fetching statistics.", | |
| ) | |
| async def list_clusters( | |
| request: Request, | |
| clusters_data: tuple[np.ndarray, dict, pd.DataFrame] = Depends(get_clusters_data), | |
| ): | |
| """ | |
| Retrieves a list of all identified book clusters, including their names, sizes, | |
| and a sample of top books from each cluster. | |
| """ | |
| try: | |
| clusters_arr, cluster_names, book_data_with_clusters = clusters_data | |
| all_clusters = [] | |
| for cluster_id, name in cluster_names.items(): | |
| cluster_books_df = book_data_with_clusters[book_data_with_clusters["cluster_id"] == cluster_id] | |
| sample_books = [] | |
| if not cluster_books_df.empty: | |
| sample_recs = cluster_books_df.sample(min(len(cluster_books_df), 3)).to_dict(orient="records") | |
| for rec in sample_recs: | |
| sample_books.append( | |
| Book( | |
| id=str(rec["id"]), | |
| title=rec["title"], | |
| authors=(rec.get("authors", "").split(", ") if isinstance(rec.get("authors"), str) else []), | |
| description=rec.get("description"), | |
| genres=(rec.get("genres", "").split(", ") if isinstance(rec.get("genres"), str) else []), | |
| cover_image_url=rec.get("cover_image_url"), | |
| ) | |
| ) | |
| all_clusters.append( | |
| BookCluster( | |
| id=cluster_id, | |
| name=name, | |
| size=len(cluster_books_df), | |
| top_books=sample_books, | |
| ) | |
| ) | |
| return all_clusters | |
| except Exception as e: | |
| log_exception(e) | |
| raise HTTPException( | |
| status_code=status.HTTP_500_INTERNAL_SERVER_ERROR, | |
| detail="Internal server error while listing clusters.", | |
| ) | |
| async def get_books_in_cluster( | |
| request: Request, | |
| cluster_id: int, | |
| clusters_data: tuple[np.ndarray, dict, pd.DataFrame] = Depends(get_clusters_data), | |
| page: int = Query(1, ge=1, description="Page number"), | |
| page_size: int = Query(10, ge=1, le=100, description="Number of items per page"), | |
| ): | |
| """ | |
| Retrieves a paginated list of books belonging to a specific cluster. | |
| """ | |
| try: | |
| clusters_arr, cluster_names, book_data_with_clusters = clusters_data | |
| if cluster_id not in cluster_names: | |
| raise HTTPException( | |
| status_code=status.HTTP_404_NOT_FOUND, | |
| detail=f"Cluster with ID {cluster_id} not found.", | |
| ) | |
| cluster_books_df = book_data_with_clusters[book_data_with_clusters["cluster_id"] == cluster_id] | |
| total_books = len(cluster_books_df) | |
| start_index = (page - 1) * page_size | |
| end_index = start_index + page_size | |
| paginated_books_df = cluster_books_df.iloc[start_index:end_index] | |
| books = [] | |
| for _, rec in paginated_books_df.iterrows(): | |
| book = Book( | |
| id=str(rec["id"]), | |
| title=rec["title"], | |
| authors=(rec.get("authors", "").split(", ") if isinstance(rec.get("authors"), str) else []), | |
| description=rec.get("description"), | |
| genres=(rec.get("genres", "").split(", ") if isinstance(rec.get("genres"), str) else []), | |
| cover_image_url=rec.get("cover_image_url"), | |
| ) | |
| books.append(book) | |
| return BookSearchResult(books=books, total=total_books, page=page, page_size=page_size) | |
| except HTTPException: | |
| raise | |
| except Exception as e: | |
| log_exception(e) | |
| raise HTTPException( | |
| status_code=status.HTTP_500_INTERNAL_SERVER_ERROR, | |
| detail="Internal server error while fetching books in cluster.", | |
| ) | |
| async def get_cluster_sample( | |
| request: Request, | |
| cluster_id: int, | |
| clusters_data: tuple[np.ndarray, dict, pd.DataFrame] = Depends(get_clusters_data), | |
| sample_size: int = Query(5, ge=1, le=20, description="Number of sample books to return"), | |
| ): | |
| """ | |
| Retrieves a random sample of books from a specified cluster. | |
| """ | |
| try: | |
| clusters_arr, cluster_names, book_data_with_clusters = clusters_data | |
| if cluster_id not in cluster_names: | |
| raise HTTPException( | |
| status_code=status.HTTP_404_NOT_FOUND, | |
| detail=f"Cluster with ID {cluster_id} not found.", | |
| ) | |
| cluster_books_df = book_data_with_clusters[book_data_with_clusters["cluster_id"] == cluster_id] | |
| if cluster_books_df.empty: | |
| return [] | |
| sample_df = cluster_books_df.sample(min(len(cluster_books_df), sample_size)) | |
| books = [] | |
| for _, rec in sample_df.iterrows(): | |
| book = Book( | |
| id=str(rec["id"]), | |
| title=rec["title"], | |
| authors=(rec.get("authors", "").split(", ") if isinstance(rec.get("authors"), str) else []), | |
| description=rec.get("description"), | |
| genres=(rec.get("genres", "").split(", ") if isinstance(rec.get("genres"), str) else []), | |
| cover_image_url=rec.get("cover_image_url"), | |
| ) | |
| books.append(book) | |
| return books | |
| except HTTPException: | |
| raise | |
| except Exception as e: | |
| log_exception(e) | |
| raise HTTPException( | |
| status_code=status.HTTP_500_INTERNAL_SERVER_ERROR, | |
| detail="Internal server error while fetching cluster sample.", | |
| ) | |
| async def explain_recommendation_endpoint(request: Request, body: ExplainRecommendationRequest): | |
| """ | |
| Generates a human-readable explanation for why a specific book was recommended | |
| based on a user query and the book's attributes. | |
| """ | |
| try: | |
| book_dict = body.recommended_book.model_dump() | |
| explanation = explain_recommendation( | |
| query_text=body.query_text, recommended_book=book_dict, similarity_score=body.similarity_score | |
| ) | |
| return ExplanationResponse(**explanation) | |
| except Exception as e: | |
| log_exception(e) | |
| raise HTTPException( | |
| status_code=status.HTTP_500_INTERNAL_SERVER_ERROR, | |
| detail="Internal server error during explanation generation.", | |
| ) | |
| async def submit_feedback( | |
| request: Request, | |
| body: FeedbackRequest, | |
| recommender: BookRecommender = Depends(get_recommender), | |
| ): | |
| """ | |
| Allows users to submit positive or negative feedback on a book recommendation. | |
| """ | |
| try: | |
| book_details_df = recommender.book_data[recommender.book_data["id"] == body.book_id] | |
| if book_details_df.empty: | |
| raise HTTPException( | |
| status_code=status.HTTP_404_NOT_FOUND, | |
| detail=f"Book with ID {body.book_id} not found.", | |
| ) | |
| book_details = book_details_df.iloc[0].to_dict() | |
| save_feedback( | |
| query=body.query, | |
| book_details=book_details, | |
| feedback_type=body.feedback_type, | |
| session_id=body.session_id, | |
| ) | |
| return {"message": "Feedback submitted successfully"} | |
| except HTTPException: | |
| raise | |
| except Exception as e: | |
| log_exception(e) | |
| raise HTTPException( | |
| status_code=status.HTTP_500_INTERNAL_SERVER_ERROR, | |
| detail="Internal server error while submitting feedback.", | |
| ) | |
| async def get_feedback_stats(request: Request): | |
| """ | |
| Retrieves aggregated statistics about the collected user feedback. | |
| """ | |
| try: | |
| all_feedback = get_all_feedback() | |
| total_feedback = len(all_feedback) | |
| positive_feedback = sum(1 for f in all_feedback if f["feedback"] == "positive") | |
| negative_feedback = sum(1 for f in all_feedback if f["feedback"] == "negative") | |
| feedback_by_book_title: Dict[str, Dict[str, int]] = {} | |
| feedback_by_query: Dict[str, Dict[str, int]] = {} | |
| for entry in all_feedback: | |
| book_title = entry.get("book_title", "Unknown Book") | |
| query = entry.get("query", "Unknown Query") | |
| feedback_type = entry["feedback"] | |
| feedback_by_book_title.setdefault(book_title, {"positive": 0, "negative": 0})[feedback_type] += 1 | |
| feedback_by_query.setdefault(query, {"positive": 0, "negative": 0})[feedback_type] += 1 | |
| return FeedbackStatsResponse( | |
| total_feedback=total_feedback, | |
| positive_feedback=positive_feedback, | |
| negative_feedback=negative_feedback, | |
| feedback_by_book_title=feedback_by_book_title, | |
| feedback_by_query=feedback_by_query, | |
| ) | |
| except Exception as e: | |
| log_exception(e) | |
| raise HTTPException( | |
| status_code=status.HTTP_500_INTERNAL_SERVER_ERROR, | |
| detail="Internal server error while fetching feedback statistics.", | |
| ) | |
| def main(): | |
| """Entry point for uvicorn""" | |
| import uvicorn | |
| port = int(os.getenv("PORT", "8000")) | |
| uvicorn.run(app, host="0.0.0.0", port=port) | |
| if __name__ == "__main__": | |
| main() | |