import os import base64 import json import traceback import numpy as np import cv2 import requests from deepface import DeepFace from dotenv import load_dotenv from fastapi import FastAPI, File, HTTPException, UploadFile from fastapi.middleware.cors import CORSMiddleware from pydantic import BaseModel from google import genai from ytmusic_client import ( YouTubeMusicError, search_songs, get_song_info, search_artists, get_artist_songs, recommend_song_for_emotion, ) os.environ['TF_CPP_MIN_LOG_LEVEL'] = '2' load_dotenv() GEMINI_API_KEY = os.getenv("GEMINI_API_KEY") GEMINI_MODEL = os.getenv("GEMINI_MODEL", "gemini-3-flash-preview") if GEMINI_API_KEY: GEMINI_API_KEY = GEMINI_API_KEY.strip().replace('"', '').replace("'", "") if not GEMINI_API_KEY or GEMINI_API_KEY == "YOUR_API_KEY_HERE": print("⚠️ WARNING: GEMINI_API_KEY not found or using placeholder.") GEMINI_CLIENT = None else: masked_key = f"{GEMINI_API_KEY[:4]}...{GEMINI_API_KEY[-4:]}" print(f"✅ API Key detected: {masked_key} (Length: {len(GEMINI_API_KEY)})") print(f"✅ Using Gemini model: {GEMINI_MODEL}") GEMINI_CLIENT = genai.Client(api_key=GEMINI_API_KEY) YTMUSIC_OAUTH_FILE = os.getenv("YTMUSIC_OAUTH_FILE", "oauth.json") YTMUSIC_CLIENT_ID = os.getenv("YTMUSIC_CLIENT_ID") YTMUSIC_CLIENT_SECRET = os.getenv("YTMUSIC_CLIENT_SECRET") if os.path.exists(YTMUSIC_OAUTH_FILE): import json with open(YTMUSIC_OAUTH_FILE, 'r') as f: oauth_data = json.load(f) if "oauth_credentials" in oauth_data: print(f"✅ YouTube Music OAuth file found with credentials: {YTMUSIC_OAUTH_FILE}") else: print(f"ℹ️ YouTube Music OAuth file found but incomplete: {YTMUSIC_OAUTH_FILE}") else: print(f"ℹ️ YouTube Music OAuth file not found: {YTMUSIC_OAUTH_FILE}") print(" Run: ytmusicapi oauth to set up authentication (optional)") app = FastAPI(title="Ytapp – YouTube Music Mood-based Recommender") app.add_middleware( CORSMiddleware, allow_origins=["*"], allow_credentials=True, allow_methods=["*"], allow_headers=["*"], ) class TextMoodRequest(BaseModel): text: str class RecommendationResponse(BaseModel): mood_label: str mood_score: float video_id: str | None title: str | None artists: list[str] | None album: str | None duration: str | None image_url: str | None external_url: str | None def _analyze_face_deepface(image_bytes: bytes) -> tuple[str, float]: npimg = np.frombuffer(image_bytes, np.uint8) img = cv2.imdecode(npimg, cv2.IMREAD_COLOR) result = DeepFace.analyze(img, actions=['emotion'], enforce_detection=False) res = result[0] if isinstance(result, list) else result emotions_dict = {key: float(value) for key, value in res['emotion'].items() if key != 'disgust'} total = sum(emotions_dict.values()) if total > 0: emotions_dict = {key: (value / total) * 100 for key, value in emotions_dict.items()} dominant = max(emotions_dict, key=emotions_dict.get) score = emotions_dict[dominant] / 100.0 emotion_map = { "happy": "joy", "sad": "sadness", "angry": "anger", "fear": "fear", "surprise": "surprise", "neutral": "neutral", } return emotion_map.get(dominant, "neutral"), score def _analyze_face_gemini(image_bytes: bytes) -> tuple[str, float]: if GEMINI_CLIENT is None: raise ValueError("GEMINI_API_KEY not configured") prompt = """ You are an emotion detection AI. Analyze the facial expression in this image. DO NOT use 'disgust'. Return ONLY a valid JSON object with this exact structure: { "dominant_emotion": "happy|sad|angry|neutral|fear|surprise", "confidence": 0.0-1.0 } """ response = GEMINI_CLIENT.models.generate_content( model=GEMINI_MODEL, contents=[ { "role": "user", "parts": [ {"text": prompt}, { "inline_data": { "mime_type": "image/jpeg", "data": base64.b64encode(image_bytes).decode("utf-8"), } }, ], } ], ) text = response.text or "" try: result = json.loads(text) except Exception: raise ValueError(f"Gemini response not JSON: {text}") emotion_map = { "happy": "joy", "sad": "sadness", "angry": "anger", "fear": "fear", "surprise": "surprise", "neutral": "neutral", } dominant = result.get("dominant_emotion", "neutral").lower() confidence = float(result.get("confidence", 0.5)) return emotion_map.get(dominant, "neutral"), confidence def _analyze_text_gemini(text: str) -> tuple[str, float]: if GEMINI_CLIENT is None: raise ValueError("GEMINI_API_KEY not configured") prompt = f""" Analyze the emotional tone of this text: \"{text}\" Return ONLY a valid JSON object with this exact structure: {{ "dominant_emotion": "joy|sadness|anger|neutral|fear|surprise", "confidence": 0.0-1.0 }} """ response = GEMINI_CLIENT.models.generate_content( model=GEMINI_MODEL, contents=prompt, ) raw_text = response.text or "" try: result = json.loads(raw_text) except Exception: raise ValueError(f"Gemini response not JSON: {raw_text}") dominant = result.get("dominant_emotion", "neutral").lower() confidence = float(result.get("confidence", 0.5)) return dominant, confidence @app.get("/health") def health() -> dict: return {"status": "ok"} @app.get("/search") def search_songs_endpoint(query: str, limit: int = 20) -> dict: try: oauth_file = YTMUSIC_OAUTH_FILE if os.path.exists(YTMUSIC_OAUTH_FILE) else None songs = search_songs(query, limit=limit, oauth_file=oauth_file) return {"query": query, "limit": limit, "songs": songs} except YouTubeMusicError as exc: raise HTTPException(status_code=exc.status_code, detail=exc.message) from exc except Exception as exc: raise HTTPException(status_code=500, detail=f"Unexpected error: {str(exc)}") from exc @app.get("/song/{video_id}") def get_song_endpoint(video_id: str) -> dict: try: oauth_file = YTMUSIC_OAUTH_FILE if os.path.exists(YTMUSIC_OAUTH_FILE) else None song = get_song_info(video_id, oauth_file=oauth_file) return song except YouTubeMusicError as exc: raise HTTPException(status_code=exc.status_code, detail=exc.message) from exc except Exception as exc: raise HTTPException(status_code=500, detail=f"Unexpected error: {str(exc)}") from exc @app.get("/artists/search") def search_artists_endpoint(query: str, limit: int = 10) -> dict: try: oauth_file = YTMUSIC_OAUTH_FILE if os.path.exists(YTMUSIC_OAUTH_FILE) else None artists = search_artists(query, limit=limit, oauth_file=oauth_file) return {"query": query, "limit": limit, "artists": artists} except YouTubeMusicError as exc: raise HTTPException(status_code=exc.status_code, detail=exc.message) from exc except Exception as exc: raise HTTPException(status_code=500, detail=f"Unexpected error: {str(exc)}") from exc @app.get("/artists/{artist_id}/songs") def artist_songs_endpoint(artist_id: str, limit: int = 50) -> dict: try: oauth_file = YTMUSIC_OAUTH_FILE if os.path.exists(YTMUSIC_OAUTH_FILE) else None songs = get_artist_songs(artist_id, limit=limit, oauth_file=oauth_file) return {"artist_id": artist_id, "limit": limit, "songs": songs} except YouTubeMusicError as exc: raise HTTPException(status_code=exc.status_code, detail=exc.message) from exc except Exception as exc: raise HTTPException(status_code=500, detail=f"Unexpected error: {str(exc)}") from exc @app.post("/mood/text", response_model=RecommendationResponse) def mood_from_text(body: TextMoodRequest) -> RecommendationResponse: if not body.text.strip(): raise HTTPException(status_code=400, detail="Text cannot be empty") try: label, score = _analyze_text_gemini(body.text) except Exception as e: raise HTTPException(status_code=500, detail=f"Emotion analysis failed: {str(e)}") try: oauth_file = YTMUSIC_OAUTH_FILE if os.path.exists(YTMUSIC_OAUTH_FILE) else None song = recommend_song_for_emotion(label, source="text", oauth_file=oauth_file) except Exception: song = {} return RecommendationResponse( mood_label=label, mood_score=score, video_id=song.get("video_id"), title=song.get("title"), artists=song.get("artists", []), album=song.get("album"), duration=song.get("duration"), image_url=song.get("image_url"), external_url=song.get("external_url"), ) @app.post("/mood/face", response_model=RecommendationResponse) async def mood_from_face(file: UploadFile = File(...)) -> RecommendationResponse: contents = await file.read() try: label, score = _analyze_face_gemini(contents) except Exception: try: label, score = _analyze_face_deepface(contents) except Exception as e: raise HTTPException(status_code=500, detail=f"Face analysis failed: {str(e)}") try: oauth_file = YTMUSIC_OAUTH_FILE if os.path.exists(YTMUSIC_OAUTH_FILE) else None song = recommend_song_for_emotion(label, source="face", oauth_file=oauth_file) except Exception: song = {} return RecommendationResponse( mood_label=label, mood_score=score, video_id=song.get("video_id"), title=song.get("title"), artists=song.get("artists", []), album=song.get("album"), duration=song.get("duration"), image_url=song.get("image_url"), external_url=song.get("external_url"), ) @app.post("/mood/face/live", response_model=RecommendationResponse) async def mood_from_face_live(file: UploadFile = File(...)) -> RecommendationResponse: contents = await file.read() try: label, score = _analyze_face_deepface(contents) except Exception as e: raise HTTPException(status_code=500, detail=f"Live face analysis failed: {str(e)}") try: oauth_file = YTMUSIC_OAUTH_FILE if os.path.exists(YTMUSIC_OAUTH_FILE) else None song = recommend_song_for_emotion(label, source="face", oauth_file=oauth_file) except Exception: song = {} return RecommendationResponse( mood_label=label, mood_score=score, video_id=song.get("video_id"), title=song.get("title"), artists=song.get("artists", []), album=song.get("album"), duration=song.get("duration"), image_url=song.get("image_url"), external_url=song.get("external_url"), ) print("⏳ Waking up local AI...") try: DeepFace.analyze(np.zeros((224, 224, 3), dtype=np.uint8), actions=['emotion'], enforce_detection=False) except: pass print("✅ SYSTEM READY!") if __name__ == "__main__": import uvicorn uvicorn.run(app, host="0.0.0.0", port=7860)