Spaces:
Sleeping
Sleeping
| from fastapi import FastAPI, HTTPException | |
| from fastapi.middleware.cors import CORSMiddleware | |
| from pydantic import BaseModel | |
| from typing import Optional | |
| import psycopg2 | |
| from dotenv import load_dotenv | |
| from psycopg2.extras import RealDictCursor | |
| import numpy as np | |
| import os | |
| from huggingface_hub import hf_hub_download | |
| # ------------------------- | |
| # App Setup | |
| # ------------------------- | |
| app = FastAPI(title="Movie Recommender API") | |
| app.add_middleware( | |
| CORSMiddleware, | |
| allow_origins=["*"], | |
| allow_methods=["*"], | |
| allow_headers=["*"], | |
| ) | |
| if os.getenv("ENV") != "production": | |
| load_dotenv() | |
| # ------------------------- | |
| # Load Models on Startup | |
| # ------------------------- | |
| # Trained features (local) | |
| compressed_features = np.load('new_compressed_features.npy') | |
| movie_norm = compressed_features / np.linalg.norm(compressed_features, axis=1, keepdims=True) | |
| print(f"Loaded trained: {compressed_features.shape}") | |
| # Untrained features (from HF dataset) | |
| untrained_path = hf_hub_download( | |
| repo_id="JustinDuvivier/untrained", | |
| filename="untrained_features.npy", | |
| repo_type="dataset" | |
| ) | |
| untrained_features = np.load(untrained_path) | |
| untrained_norm = untrained_features / np.linalg.norm(untrained_features, axis=1, keepdims=True) | |
| print(f"Loaded untrained: {untrained_features.shape}") | |
| # ------------------------- | |
| # Database Connection | |
| # ------------------------- | |
| def get_db(): | |
| return psycopg2.connect( | |
| host=os.getenv("DB_HOST"), | |
| database=os.getenv("DB_NAME"), | |
| user=os.getenv("DB_USER"), | |
| password=os.getenv("DB_PASSWORD"), | |
| port=6543, | |
| cursor_factory=RealDictCursor | |
| ) | |
| # ------------------------- | |
| # Request Models | |
| # ------------------------- | |
| class RecommendRequest(BaseModel): | |
| movie_ids: list[int] | |
| n: int = 20 | |
| min_rating: Optional[float] = None | |
| year_from: Optional[int] = None | |
| year_to: Optional[int] = None | |
| min_runtime: Optional[int] = None | |
| max_runtime: Optional[int] = None | |
| quality_weight: float = 0.1 | |
| # ------------------------- | |
| # Helper Functions | |
| # ------------------------- | |
| def knn_search(movie_ids: list[int], k: int = 200): | |
| """Find k most similar movies using TRAINED features""" | |
| selected = compressed_features[movie_ids] | |
| avg_vector = np.mean(selected, axis=0) | |
| user_norm = avg_vector / np.linalg.norm(avg_vector) | |
| similarities = (user_norm @ movie_norm.T) | |
| top_indices = np.argsort(similarities)[::-1][:k] | |
| results = [] | |
| for idx in top_indices: | |
| if int(idx) not in movie_ids: | |
| results.append({ | |
| "id": int(idx), | |
| "similarity": float(similarities[idx]) | |
| }) | |
| return results | |
| def knn_search_untrained(movie_ids: list[int], k: int = 200): | |
| """Find k most similar movies using UNTRAINED features""" | |
| selected = untrained_features[movie_ids] | |
| avg_vector = np.mean(selected, axis=0) | |
| user_norm = avg_vector / np.linalg.norm(avg_vector) | |
| similarities = (user_norm @ untrained_norm.T) | |
| top_indices = np.argsort(similarities)[::-1][:k] | |
| results = [] | |
| for idx in top_indices: | |
| if int(idx) not in movie_ids: | |
| results.append({ | |
| "id": int(idx), | |
| "similarity": float(similarities[idx]) | |
| }) | |
| return results | |
| def infer_filters(movie_ids: list[int], conn) -> dict: | |
| """Infer year range, rating, and runtime from selected movies""" | |
| cur = conn.cursor() | |
| cur.execute(""" | |
| SELECT | |
| MIN(release_year) as min_year, | |
| MAX(release_year) as max_year, | |
| AVG(vote_average) as avg_rating, | |
| MIN(vote_average) as min_rating, | |
| MIN(runtime) as min_runtime, | |
| MAX(runtime) as max_runtime, | |
| AVG(runtime) as avg_runtime | |
| FROM cleaned_movies | |
| WHERE id = ANY(%s) | |
| """, (movie_ids,)) | |
| row = cur.fetchone() | |
| if not row or row['min_year'] is None: | |
| return { | |
| 'year_from': 1900, | |
| 'year_to': 2100, | |
| 'min_rating': 0, | |
| 'min_runtime': 60, | |
| 'max_runtime': 300 | |
| } | |
| year_from = int(row['min_year']) - 10 | |
| year_to = int(row['max_year']) + 10 | |
| min_rating = max(0, float(row['min_rating']) - 1.0) | |
| min_runtime = max(60, int(row['min_runtime'] or 90) - 75) | |
| max_runtime = min(300, int(row['max_runtime'] or 120) + 30) | |
| return { | |
| 'year_from': year_from, | |
| 'year_to': year_to, | |
| 'min_rating': round(min_rating, 1), | |
| 'min_runtime': min_runtime, | |
| 'max_runtime': max_runtime | |
| } | |
| def calculate_quality_score(vote_average: float) -> float: | |
| """Calculate quality score (0-1 range)""" | |
| return min(1.0, max(0.0, vote_average / 10.0)) | |
| # ------------------------- | |
| # Endpoints | |
| # ------------------------- | |
| async def root(): | |
| return { | |
| "status": "running", | |
| "trained_features": compressed_features.shape[0], | |
| "untrained_features": untrained_features.shape[0] | |
| } | |
| async def test_db(): | |
| try: | |
| conn = get_db() | |
| cur = conn.cursor() | |
| cur.execute("SELECT COUNT(*) as count FROM cleaned_movies") | |
| result = cur.fetchone() | |
| conn.close() | |
| return {"connected": True, "movie_count": result["count"]} | |
| except Exception as e: | |
| return {"connected": False, "error": str(e)} | |
| async def search_movies(q: str, n: int = 10): | |
| conn = get_db() | |
| cur = conn.cursor() | |
| cur.execute(""" | |
| SELECT id, title, poster | |
| FROM cleaned_movies | |
| WHERE title ILIKE %s | |
| AND poster IS NOT NULL | |
| AND poster != '' | |
| LIMIT %s | |
| """, (f"%{q}%", n)) | |
| rows = cur.fetchall() | |
| conn.close() | |
| return [dict(row) for row in rows] | |
| async def get_movie(movie_id: int): | |
| conn = get_db() | |
| cur = conn.cursor() | |
| cur.execute(""" | |
| SELECT id, title, poster, genres, keywords, vote_average, runtime, release_year, description | |
| FROM cleaned_movies | |
| WHERE id = %s | |
| """, (movie_id,)) | |
| row = cur.fetchone() | |
| conn.close() | |
| if not row: | |
| raise HTTPException(status_code=404, detail="Movie not found") | |
| return dict(row) | |
| def get_recommendations(req: RecommendRequest): | |
| if not req.movie_ids: | |
| raise HTTPException(status_code=400, detail="No movie IDs provided") | |
| for mid in req.movie_ids: | |
| if mid >= len(compressed_features) or mid < 0: | |
| raise HTTPException(status_code=400, detail=f"Invalid movie ID: {mid}") | |
| conn = get_db() | |
| try: | |
| inferred = infer_filters(req.movie_ids, conn) | |
| min_rating = req.min_rating if req.min_rating is not None else inferred['min_rating'] | |
| year_from = req.year_from if req.year_from is not None else inferred['year_from'] | |
| year_to = req.year_to if req.year_to is not None else inferred['year_to'] | |
| min_runtime = req.min_runtime if req.min_runtime is not None else inferred['min_runtime'] | |
| max_runtime = req.max_runtime if req.max_runtime is not None else inferred['max_runtime'] | |
| knn_results = knn_search(req.movie_ids, k=200) | |
| if not knn_results: | |
| return {"model": "trained", "filters_used": {}, "movies": []} | |
| candidate_ids = [r["id"] for r in knn_results] | |
| similarity_map = {r["id"]: r["similarity"] for r in knn_results} | |
| cur = conn.cursor() | |
| cur.execute(""" | |
| SELECT id, title, poster, genres, vote_average, runtime, release_year, description | |
| FROM cleaned_movies | |
| WHERE id = ANY(%s) | |
| AND vote_average >= %s | |
| AND release_year >= %s | |
| AND release_year <= %s | |
| AND runtime >= %s | |
| AND runtime <= %s | |
| AND poster IS NOT NULL | |
| AND poster != '' | |
| """, (candidate_ids, min_rating, year_from, year_to, min_runtime, max_runtime)) | |
| rows = cur.fetchall() | |
| finally: | |
| conn.close() | |
| results = [] | |
| for row in rows: | |
| movie = dict(row) | |
| similarity = similarity_map.get(int(row["id"]), 0) | |
| quality = calculate_quality_score(float(row["vote_average"] or 0)) | |
| combined_score = (1 - req.quality_weight) * similarity + req.quality_weight * quality | |
| movie["similarity"] = round(similarity, 3) | |
| movie["quality_score"] = round(quality, 3) | |
| movie["score"] = round(combined_score, 3) | |
| results.append(movie) | |
| results.sort(key=lambda x: x["score"], reverse=True) | |
| return { | |
| "model": "trained", | |
| "filters_used": { | |
| "min_rating": min_rating, | |
| "year_from": year_from, | |
| "year_to": year_to, | |
| "min_runtime": min_runtime, | |
| "max_runtime": max_runtime, | |
| "quality_weight": req.quality_weight | |
| }, | |
| "movies": results[:req.n] | |
| } | |
| def get_recommendations_untrained(req: RecommendRequest): | |
| if not req.movie_ids: | |
| raise HTTPException(status_code=400, detail="No movie IDs provided") | |
| for mid in req.movie_ids: | |
| if mid >= len(untrained_features) or mid < 0: | |
| raise HTTPException(status_code=400, detail=f"Invalid movie ID: {mid}") | |
| conn = get_db() | |
| try: | |
| inferred = infer_filters(req.movie_ids, conn) | |
| min_rating = req.min_rating if req.min_rating is not None else inferred['min_rating'] | |
| year_from = req.year_from if req.year_from is not None else inferred['year_from'] | |
| year_to = req.year_to if req.year_to is not None else inferred['year_to'] | |
| min_runtime = req.min_runtime if req.min_runtime is not None else inferred['min_runtime'] | |
| max_runtime = req.max_runtime if req.max_runtime is not None else inferred['max_runtime'] | |
| knn_results = knn_search_untrained(req.movie_ids, k=200) | |
| if not knn_results: | |
| return {"model": "untrained", "filters_used": {}, "movies": []} | |
| candidate_ids = [r["id"] for r in knn_results] | |
| similarity_map = {r["id"]: r["similarity"] for r in knn_results} | |
| cur = conn.cursor() | |
| cur.execute(""" | |
| SELECT id, title, poster, genres, vote_average, runtime, release_year, description | |
| FROM cleaned_movies | |
| WHERE id = ANY(%s) | |
| AND vote_average >= %s | |
| AND release_year >= %s | |
| AND release_year <= %s | |
| AND runtime >= %s | |
| AND runtime <= %s | |
| AND poster IS NOT NULL | |
| AND poster != '' | |
| """, (candidate_ids, min_rating, year_from, year_to, min_runtime, max_runtime)) | |
| rows = cur.fetchall() | |
| finally: | |
| conn.close() | |
| results = [] | |
| for row in rows: | |
| movie = dict(row) | |
| similarity = similarity_map.get(int(row["id"]), 0) | |
| quality = calculate_quality_score(float(row["vote_average"] or 0)) | |
| combined_score = (1 - req.quality_weight) * similarity + req.quality_weight * quality | |
| movie["similarity"] = round(similarity, 3) | |
| movie["quality_score"] = round(quality, 3) | |
| movie["score"] = round(combined_score, 3) | |
| results.append(movie) | |
| results.sort(key=lambda x: x["score"], reverse=True) | |
| return { | |
| "model": "untrained", | |
| "filters_used": { | |
| "min_rating": min_rating, | |
| "year_from": year_from, | |
| "year_to": year_to, | |
| "min_runtime": min_runtime, | |
| "max_runtime": max_runtime, | |
| "quality_weight": req.quality_weight | |
| }, | |
| "movies": results[:req.n] | |
| } |