| import os |
| import base64 |
| import json |
| import traceback |
|
|
| import numpy as np |
| import cv2 |
| import requests |
| from deepface import DeepFace |
| from dotenv import load_dotenv |
| from fastapi import FastAPI, File, HTTPException, UploadFile |
| from fastapi.middleware.cors import CORSMiddleware |
| from pydantic import BaseModel |
| from google import genai |
|
|
| from ytmusic_client import ( |
| YouTubeMusicError, |
| search_songs, |
| get_song_info, |
| search_artists, |
| get_artist_songs, |
| recommend_song_for_emotion, |
| ) |
|
|
|
|
| os.environ['TF_CPP_MIN_LOG_LEVEL'] = '2' |
|
|
| load_dotenv() |
|
|
| GEMINI_API_KEY = os.getenv("GEMINI_API_KEY") |
| GEMINI_MODEL = os.getenv("GEMINI_MODEL", "gemini-3-flash-preview") |
| if GEMINI_API_KEY: |
| GEMINI_API_KEY = GEMINI_API_KEY.strip().replace('"', '').replace("'", "") |
|
|
| if not GEMINI_API_KEY or GEMINI_API_KEY == "YOUR_API_KEY_HERE": |
| print("⚠️ WARNING: GEMINI_API_KEY not found or using placeholder.") |
| GEMINI_CLIENT = None |
| else: |
| masked_key = f"{GEMINI_API_KEY[:4]}...{GEMINI_API_KEY[-4:]}" |
| print(f"✅ API Key detected: {masked_key} (Length: {len(GEMINI_API_KEY)})") |
| print(f"✅ Using Gemini model: {GEMINI_MODEL}") |
| GEMINI_CLIENT = genai.Client(api_key=GEMINI_API_KEY) |
|
|
| YTMUSIC_OAUTH_FILE = os.getenv("YTMUSIC_OAUTH_FILE", "oauth.json") |
| YTMUSIC_CLIENT_ID = os.getenv("YTMUSIC_CLIENT_ID") |
| YTMUSIC_CLIENT_SECRET = os.getenv("YTMUSIC_CLIENT_SECRET") |
|
|
| if os.path.exists(YTMUSIC_OAUTH_FILE): |
| import json |
| with open(YTMUSIC_OAUTH_FILE, 'r') as f: |
| oauth_data = json.load(f) |
| if "oauth_credentials" in oauth_data: |
| print(f"✅ YouTube Music OAuth file found with credentials: {YTMUSIC_OAUTH_FILE}") |
| else: |
| print(f"ℹ️ YouTube Music OAuth file found but incomplete: {YTMUSIC_OAUTH_FILE}") |
| else: |
| print(f"ℹ️ YouTube Music OAuth file not found: {YTMUSIC_OAUTH_FILE}") |
| print(" Run: ytmusicapi oauth to set up authentication (optional)") |
|
|
|
|
| app = FastAPI(title="Ytapp – YouTube Music Mood-based Recommender") |
| app.add_middleware( |
| CORSMiddleware, |
| allow_origins=["*"], |
| allow_credentials=True, |
| allow_methods=["*"], |
| allow_headers=["*"], |
| ) |
|
|
|
|
| class TextMoodRequest(BaseModel): |
| text: str |
|
|
|
|
| class RecommendationResponse(BaseModel): |
| mood_label: str |
| mood_score: float |
| video_id: str | None |
| title: str | None |
| artists: list[str] | None |
| album: str | None |
| duration: str | None |
| image_url: str | None |
| external_url: str | None |
|
|
|
|
| def _analyze_face_deepface(image_bytes: bytes) -> tuple[str, float]: |
| npimg = np.frombuffer(image_bytes, np.uint8) |
| img = cv2.imdecode(npimg, cv2.IMREAD_COLOR) |
| |
| result = DeepFace.analyze(img, actions=['emotion'], enforce_detection=False) |
| res = result[0] if isinstance(result, list) else result |
| |
| emotions_dict = {key: float(value) for key, value in res['emotion'].items() if key != 'disgust'} |
| total = sum(emotions_dict.values()) |
| if total > 0: |
| emotions_dict = {key: (value / total) * 100 for key, value in emotions_dict.items()} |
| |
| dominant = max(emotions_dict, key=emotions_dict.get) |
| score = emotions_dict[dominant] / 100.0 |
| |
| emotion_map = { |
| "happy": "joy", |
| "sad": "sadness", |
| "angry": "anger", |
| "fear": "fear", |
| "surprise": "surprise", |
| "neutral": "neutral", |
| } |
| |
| return emotion_map.get(dominant, "neutral"), score |
|
|
|
|
| def _analyze_face_gemini(image_bytes: bytes) -> tuple[str, float]: |
| if GEMINI_CLIENT is None: |
| raise ValueError("GEMINI_API_KEY not configured") |
|
|
| prompt = """ |
| You are an emotion detection AI. Analyze the facial expression in this image. |
| DO NOT use 'disgust'. |
| |
| Return ONLY a valid JSON object with this exact structure: |
| { |
| "dominant_emotion": "happy|sad|angry|neutral|fear|surprise", |
| "confidence": 0.0-1.0 |
| } |
| """ |
|
|
| response = GEMINI_CLIENT.models.generate_content( |
| model=GEMINI_MODEL, |
| contents=[ |
| { |
| "role": "user", |
| "parts": [ |
| {"text": prompt}, |
| { |
| "inline_data": { |
| "mime_type": "image/jpeg", |
| "data": base64.b64encode(image_bytes).decode("utf-8"), |
| } |
| }, |
| ], |
| } |
| ], |
| ) |
|
|
| text = response.text or "" |
| try: |
| result = json.loads(text) |
| except Exception: |
| raise ValueError(f"Gemini response not JSON: {text}") |
| |
| emotion_map = { |
| "happy": "joy", |
| "sad": "sadness", |
| "angry": "anger", |
| "fear": "fear", |
| "surprise": "surprise", |
| "neutral": "neutral", |
| } |
| |
| dominant = result.get("dominant_emotion", "neutral").lower() |
| confidence = float(result.get("confidence", 0.5)) |
| |
| return emotion_map.get(dominant, "neutral"), confidence |
|
|
|
|
| def _analyze_text_gemini(text: str) -> tuple[str, float]: |
| if GEMINI_CLIENT is None: |
| raise ValueError("GEMINI_API_KEY not configured") |
|
|
| prompt = f""" |
| Analyze the emotional tone of this text: \"{text}\" |
| |
| Return ONLY a valid JSON object with this exact structure: |
| {{ |
| "dominant_emotion": "joy|sadness|anger|neutral|fear|surprise", |
| "confidence": 0.0-1.0 |
| }} |
| """ |
|
|
| response = GEMINI_CLIENT.models.generate_content( |
| model=GEMINI_MODEL, |
| contents=prompt, |
| ) |
|
|
| raw_text = response.text or "" |
| try: |
| result = json.loads(raw_text) |
| except Exception: |
| raise ValueError(f"Gemini response not JSON: {raw_text}") |
|
|
| dominant = result.get("dominant_emotion", "neutral").lower() |
| confidence = float(result.get("confidence", 0.5)) |
|
|
| return dominant, confidence |
|
|
|
|
| @app.get("/health") |
| def health() -> dict: |
| return {"status": "ok"} |
|
|
|
|
| @app.get("/search") |
| def search_songs_endpoint(query: str, limit: int = 20) -> dict: |
| try: |
| oauth_file = YTMUSIC_OAUTH_FILE if os.path.exists(YTMUSIC_OAUTH_FILE) else None |
| songs = search_songs(query, limit=limit, oauth_file=oauth_file) |
| return {"query": query, "limit": limit, "songs": songs} |
| except YouTubeMusicError as exc: |
| raise HTTPException(status_code=exc.status_code, detail=exc.message) from exc |
| except Exception as exc: |
| raise HTTPException(status_code=500, detail=f"Unexpected error: {str(exc)}") from exc |
|
|
|
|
| @app.get("/song/{video_id}") |
| def get_song_endpoint(video_id: str) -> dict: |
| try: |
| oauth_file = YTMUSIC_OAUTH_FILE if os.path.exists(YTMUSIC_OAUTH_FILE) else None |
| song = get_song_info(video_id, oauth_file=oauth_file) |
| return song |
| except YouTubeMusicError as exc: |
| raise HTTPException(status_code=exc.status_code, detail=exc.message) from exc |
| except Exception as exc: |
| raise HTTPException(status_code=500, detail=f"Unexpected error: {str(exc)}") from exc |
|
|
|
|
| @app.get("/artists/search") |
| def search_artists_endpoint(query: str, limit: int = 10) -> dict: |
| try: |
| oauth_file = YTMUSIC_OAUTH_FILE if os.path.exists(YTMUSIC_OAUTH_FILE) else None |
| artists = search_artists(query, limit=limit, oauth_file=oauth_file) |
| return {"query": query, "limit": limit, "artists": artists} |
| except YouTubeMusicError as exc: |
| raise HTTPException(status_code=exc.status_code, detail=exc.message) from exc |
| except Exception as exc: |
| raise HTTPException(status_code=500, detail=f"Unexpected error: {str(exc)}") from exc |
|
|
|
|
| @app.get("/artists/{artist_id}/songs") |
| def artist_songs_endpoint(artist_id: str, limit: int = 50) -> dict: |
| try: |
| oauth_file = YTMUSIC_OAUTH_FILE if os.path.exists(YTMUSIC_OAUTH_FILE) else None |
| songs = get_artist_songs(artist_id, limit=limit, oauth_file=oauth_file) |
| return {"artist_id": artist_id, "limit": limit, "songs": songs} |
| except YouTubeMusicError as exc: |
| raise HTTPException(status_code=exc.status_code, detail=exc.message) from exc |
| except Exception as exc: |
| raise HTTPException(status_code=500, detail=f"Unexpected error: {str(exc)}") from exc |
|
|
|
|
| @app.post("/mood/text", response_model=RecommendationResponse) |
| def mood_from_text(body: TextMoodRequest) -> RecommendationResponse: |
| if not body.text.strip(): |
| raise HTTPException(status_code=400, detail="Text cannot be empty") |
|
|
| try: |
| label, score = _analyze_text_gemini(body.text) |
| except Exception as e: |
| raise HTTPException(status_code=500, detail=f"Emotion analysis failed: {str(e)}") |
|
|
| try: |
| oauth_file = YTMUSIC_OAUTH_FILE if os.path.exists(YTMUSIC_OAUTH_FILE) else None |
| song = recommend_song_for_emotion(label, source="text", oauth_file=oauth_file) |
| except Exception: |
| song = {} |
|
|
| return RecommendationResponse( |
| mood_label=label, |
| mood_score=score, |
| video_id=song.get("video_id"), |
| title=song.get("title"), |
| artists=song.get("artists", []), |
| album=song.get("album"), |
| duration=song.get("duration"), |
| image_url=song.get("image_url"), |
| external_url=song.get("external_url"), |
| ) |
|
|
|
|
| @app.post("/mood/face", response_model=RecommendationResponse) |
| async def mood_from_face(file: UploadFile = File(...)) -> RecommendationResponse: |
| contents = await file.read() |
| |
| try: |
| label, score = _analyze_face_gemini(contents) |
| except Exception: |
| try: |
| label, score = _analyze_face_deepface(contents) |
| except Exception as e: |
| raise HTTPException(status_code=500, detail=f"Face analysis failed: {str(e)}") |
|
|
| try: |
| oauth_file = YTMUSIC_OAUTH_FILE if os.path.exists(YTMUSIC_OAUTH_FILE) else None |
| song = recommend_song_for_emotion(label, source="face", oauth_file=oauth_file) |
| except Exception: |
| song = {} |
|
|
| return RecommendationResponse( |
| mood_label=label, |
| mood_score=score, |
| video_id=song.get("video_id"), |
| title=song.get("title"), |
| artists=song.get("artists", []), |
| album=song.get("album"), |
| duration=song.get("duration"), |
| image_url=song.get("image_url"), |
| external_url=song.get("external_url"), |
| ) |
|
|
|
|
| @app.post("/mood/face/live", response_model=RecommendationResponse) |
| async def mood_from_face_live(file: UploadFile = File(...)) -> RecommendationResponse: |
| contents = await file.read() |
| |
| try: |
| label, score = _analyze_face_deepface(contents) |
| except Exception as e: |
| raise HTTPException(status_code=500, detail=f"Live face analysis failed: {str(e)}") |
|
|
| try: |
| oauth_file = YTMUSIC_OAUTH_FILE if os.path.exists(YTMUSIC_OAUTH_FILE) else None |
| song = recommend_song_for_emotion(label, source="face", oauth_file=oauth_file) |
| except Exception: |
| song = {} |
|
|
| return RecommendationResponse( |
| mood_label=label, |
| mood_score=score, |
| video_id=song.get("video_id"), |
| title=song.get("title"), |
| artists=song.get("artists", []), |
| album=song.get("album"), |
| duration=song.get("duration"), |
| image_url=song.get("image_url"), |
| external_url=song.get("external_url"), |
| ) |
|
|
|
|
| print("⏳ Waking up local AI...") |
| try: |
| DeepFace.analyze(np.zeros((224, 224, 3), dtype=np.uint8), actions=['emotion'], enforce_detection=False) |
| except: |
| pass |
| print("✅ SYSTEM READY!") |
|
|
| if __name__ == "__main__": |
| import uvicorn |
| uvicorn.run(app, host="0.0.0.0", port=7860) |
|
|