Spaces:
Running
Running
| import os | |
| import requests | |
| import json | |
| from fastapi import FastAPI, HTTPException | |
| from pydantic import BaseModel | |
| # --- CONFIGURATION --- | |
| # 1. OpenAI/Azure Configuration | |
| AI_SERVICE_TOKENS_RAW = os.environ.get("AI_SERVICE_TOKEN", "") | |
| AI_SERVICE_TOKENS = [t.strip() for t in AI_SERVICE_TOKENS_RAW.split(",") if t.strip()] | |
| OPENAI_API_URL = "https://models.inference.ai.azure.com/chat/completions" | |
| OPENAI_MODEL_NAME = "gpt-4o-mini" | |
| # 2. Google Gemini Configuration (Direct Google API) | |
| # You need to set GOOGLE_API_KEY in your HF Space secrets | |
| GOOGLE_API_KEY = os.environ.get("GOOGLE_API_KEY", "") | |
| # Use the stable Gemini 1.5 Flash model which supports JSON mode reliable | |
| GEMINI_API_URL = f"https://generativelanguage.googleapis.com/v1beta/models/gemma-3-27b-it:generateContent?key={GOOGLE_API_KEY}" | |
| app = FastAPI( | |
| title="AI Backend Service", | |
| description="Running on Hugging Face Spaces (Docker SDK)" | |
| ) | |
| # --- MODELS --- | |
| class AnalyzeRequest(BaseModel): | |
| filename: str | |
| model_provider: str = "openai" # 'openai' or 'gemma' (maps to Gemini) | |
| # --- HELPERS --- | |
| def get_headers(token): | |
| return { | |
| "Authorization": f"Bearer {token}", | |
| "Content-Type": "application/json" | |
| } | |
| # --- ENDPOINTS --- | |
| def home(): | |
| """Health check endpoint.""" | |
| return { | |
| "status": "active", | |
| "platform": "Hugging Face Spaces", | |
| "tokens_loaded": len(AI_SERVICE_TOKENS), | |
| "google_api_enabled": bool(GOOGLE_API_KEY) | |
| } | |
| def check_limit(): | |
| """ | |
| Checks the rate limit status of ALL configured AI Service Tokens (OpenAI only). | |
| """ | |
| if not AI_SERVICE_TOKENS: | |
| return {"tokens_checked": 0, "results": [], "note": "OpenAI tokens missing"} | |
| results = [] | |
| for i, token in enumerate(AI_SERVICE_TOKENS): | |
| headers = get_headers(token) | |
| payload = { | |
| "model": OPENAI_MODEL_NAME, | |
| "messages": [{"role": "user", "content": "Ping."}], | |
| "temperature": 0.1, | |
| "max_tokens": 1 | |
| } | |
| try: | |
| response = requests.post(OPENAI_API_URL, headers=headers, json=payload, timeout=10) | |
| token_status = { | |
| "token_index": i, | |
| "status_code": response.status_code, | |
| "valid": response.status_code == 200, | |
| "remaining": response.headers.get('x-ratelimit-remaining-requests', 'N/A') | |
| } | |
| results.append(token_status) | |
| except Exception as e: | |
| results.append({"token_index": i, "status_code": "ERROR", "error": str(e)}) | |
| return {"tokens_checked": len(results), "results": results} | |
| def call_openai_gpt4o(filename, tokens): | |
| payload = { | |
| "model": OPENAI_MODEL_NAME, | |
| "messages": [ | |
| {"role": "system", "content": "You are an expert Movie and TV metadata analyst. Return ONLY raw JSON in the format: {\"title\": \"...\", \"year\": \"...\", \"isSeries\": false/true}. Analyze the following filename and extract the data."}, | |
| {"role": "user", "content": f"Analyze: \"{filename}\""} | |
| ], | |
| "temperature": 0.1, | |
| "max_tokens": 500 | |
| } | |
| last_error = "" | |
| for i, token in enumerate(tokens): | |
| try: | |
| response = requests.post(OPENAI_API_URL, headers=get_headers(token), json=payload, timeout=30) | |
| if response.status_code == 200: | |
| content = response.json().get('choices', [{}])[0].get('message', {}).get('content') | |
| return content | |
| elif response.status_code in [429, 401, 403]: | |
| last_error = f"Token {i}: {response.status_code}" | |
| continue | |
| else: | |
| last_error = f"Token {i} Error: {response.text}" | |
| except Exception as e: | |
| last_error = str(e) | |
| continue | |
| raise Exception(f"OpenAI All tokens failed. Last: {last_error}") | |
| def call_google_gemini(filename): | |
| if not GOOGLE_API_KEY: | |
| raise Exception("GOOGLE_API_KEY not configured.") | |
| # Construct the Gemini payload | |
| prompt = f""" | |
| You are an expert Movie and TV metadata analyst. | |
| Analyze the filename: "{filename}" | |
| Identify the title, year, and whether it is a series. | |
| Return ONLY a raw JSON object with this exact format: | |
| {{"title": "Movie Title", "year": "2024", "isSeries": false}} | |
| """ | |
| payload = { | |
| "contents": [{ | |
| "parts": [{"text": prompt}] | |
| }], | |
| "generationConfig": { | |
| "temperature": 0.1, | |
| "maxOutputTokens": 100, | |
| # Removed strict responseMimeType to avoid 400 error on some models | |
| # "responseMimeType": "application/json" | |
| } | |
| } | |
| response = requests.post(GEMINI_API_URL, headers={"Content-Type": "application/json"}, json=payload, timeout=30) | |
| if response.status_code != 200: | |
| raise Exception(f"Google Gemini API Error {response.status_code}: {response.text}") | |
| result = response.json() | |
| # Extract text from Gemini response structure | |
| try: | |
| return result['candidates'][0]['content']['parts'][0]['text'] | |
| except (KeyError, IndexError): | |
| raise Exception(f"Unexpected response structure from Gemini: {str(result)}") | |
| def analyze_filename(request: AnalyzeRequest): | |
| """ | |
| Analyze filename using selected provider (openai or gemma/gemini). | |
| """ | |
| raw_content = "" | |
| provider_used = request.model_provider | |
| try: | |
| if provider_used == "gemma": | |
| # Although the frontend sends "gemma", we map this to our Google Gemini function | |
| raw_content = call_google_gemini(request.filename) | |
| else: | |
| # Default to OpenAI | |
| if not AI_SERVICE_TOKENS: raise HTTPException(500, "OpenAI tokens missing.") | |
| raw_content = call_openai_gpt4o(request.filename, AI_SERVICE_TOKENS) | |
| # Parse JSON output from either provider | |
| if raw_content: | |
| clean_content = raw_content.replace("```json", "").replace("```", "").strip() | |
| # Simple extraction of JSON object if surrounded by text | |
| start = clean_content.find('{') | |
| end = clean_content.rfind('}') + 1 | |
| if start != -1 and end != -1: | |
| clean_content = clean_content[start:end] | |
| return json.loads(clean_content) | |
| return {"error": "No content returned", "provider": provider_used} | |
| except Exception as e: | |
| print(f"Analysis Error ({provider_used}): {e}") | |
| raise HTTPException(status_code=500, detail=f"Analysis failed ({provider_used}): {str(e)}") |