Spaces:
Paused
Paused
| import os | |
| import json | |
| import threading | |
| import logging | |
| import time | |
| import requests | |
| import re # ✅ ۱. اضافه کردن این خط | |
| from fastapi import FastAPI, File, Form, UploadFile, HTTPException | |
| from fastapi.middleware.cors import CORSMiddleware | |
| from PIL import Image | |
| import google.generativeai as genai | |
| from google.api_core.exceptions import ResourceExhausted, InvalidArgument | |
| from deep_translator import GoogleTranslator | |
| # ... (تمام کد قبلی تا قبل از تابع process_request بدون تغییر باقی میماند) ... | |
| # --- تنظیمات --- | |
| ALL_GEMINI_API_KEYS = os.environ.get("ALL_GEMINI_API_KEYS") | |
| gemini_keys = [] | |
| key_index = 0 | |
| key_rotation_lock = threading.Lock() | |
| MAX_ATTEMPTS_PER_REQUEST = 2 | |
| GEMINI_REQUEST_TIMEOUT = 30 | |
| logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s') | |
| logging.getLogger('google.generativeai').setLevel(logging.WARNING) | |
| logging.getLogger("urllib3").setLevel(logging.WARNING) | |
| logging.getLogger('deep_translator').setLevel(logging.WARNING) | |
| app = FastAPI(title="Self-Contained Gemini Worker with Internal Translation Fallback") | |
| app.add_middleware(CORSMiddleware, allow_origins=["*"], allow_methods=["*"], allow_headers=["*"]) | |
| if not ALL_GEMINI_API_KEYS: | |
| logging.error("❌ CRITICAL: No Gemini API keys found.") | |
| else: | |
| gemini_keys = [key.strip() for key in ALL_GEMINI_API_KEYS.split(',') if key.strip()] | |
| if gemini_keys: | |
| logging.info(f"✅ Execution Worker is active with {len(gemini_keys)} keys and internal translation fallback.") | |
| else: | |
| logging.error("❌ CRITICAL: ALL_GEMINI_API_KEYS variable is empty.") | |
| def translate_prompt_fallback(persian_prompt): | |
| logging.warning(f"⚠️ All {MAX_ATTEMPTS_PER_REQUEST} Gemini key attempts failed. Switching to internal translation fallback.") | |
| if not persian_prompt.strip(): | |
| logging.info("Fallback: User prompt is empty, using a default cinematic prompt.") | |
| return { | |
| "animation_prompt": "Cinematic animation, beautiful subtle movements, high detail, 8k, photorealistic.", | |
| "negative_prompt": "ugly, deformed, noisy, blurry, distorted, grainy, shaking, jittery, flickering, unnatural movement, static image, watermark, text, signature, cartoon, anime, 3d render." | |
| } | |
| try: | |
| logging.info(f"Translating internally: '{persian_prompt[:50]}...'") | |
| translated_text = GoogleTranslator(source='fa', target='en').translate(persian_prompt) | |
| if translated_text and isinstance(translated_text, str): | |
| logging.info(f"✅ Internal fallback success: Translated prompt to '{translated_text[:50]}...'") | |
| final_prompt = f"{translated_text}, photorealistic, high detail, smooth motion, 8k, cinematic" | |
| return { | |
| "animation_prompt": final_prompt, | |
| "negative_prompt": "ugly, deformed, noisy, blurry, distorted, grainy, shaking, jittery, flickering, unnatural movement, static image, watermark, text, signature, cartoon, anime, 3d render." | |
| } | |
| else: | |
| raise ValueError(f"Translator returned an empty or invalid response: {translated_text}") | |
| except Exception as e: | |
| logging.error(f"❌ Fallback failed: Could not translate using deep-translator. Error: {e}") | |
| return { | |
| "animation_prompt": "cinematic animation of the subject", | |
| "negative_prompt": "ugly, deformed, noisy, blurry, distorted, grainy, shaking, jittery, flickering, unnatural movement, static image, watermark, text, signature, cartoon, anime, 3d render." | |
| } | |
| def health_check(): | |
| return {"status": f"Execution worker online with {len(gemini_keys)} keys and internal translation fallback."} | |
| async def process_request( | |
| prompt: str = Form(...), | |
| image: UploadFile = File(...) | |
| ): | |
| global key_index | |
| # ✅ ۲. جایگزین کردن این بلوک کد | |
| def extract_user_prompt(master_prompt): | |
| try: | |
| # استفاده از regex برای پیدا کردن متن داخل "" بعد از عبارت کلیدی | |
| # این روش به فاصلهها و تغییرات جزئی حساس نیست | |
| match = re.search(r'User\'s Animation Idea \(in Persian\):\*\* "(.*?)"', master_prompt, re.DOTALL) | |
| if match: | |
| return match.group(1).strip() | |
| # اگر الگوی بالا پیدا نشد، یک الگوی سادهتر را امتحان میکنیم | |
| # شاید فقط متن داخل کوتیشن ارسال شده باشد | |
| match_simple = re.search(r'"(.*?)"', master_prompt) | |
| if match_simple: | |
| return match_simple.group(1).strip() | |
| return "" # اگر هیچکدام پیدا نشدند | |
| except Exception: | |
| return "" | |
| if not gemini_keys: | |
| user_prompt_text = extract_user_prompt(prompt) | |
| return translate_prompt_fallback(user_prompt_text) | |
| try: | |
| img = Image.open(image.file) | |
| except Exception: | |
| raise HTTPException(status_code=400, detail="Invalid image file.") | |
| num_attempts = min(MAX_ATTEMPTS_PER_REQUEST, len(gemini_keys)) | |
| for i in range(num_attempts): | |
| with key_rotation_lock: | |
| current_key_idx = key_index | |
| selected_api_key = gemini_keys[current_key_idx] | |
| key_index = (key_index + 1) % len(gemini_keys) | |
| logging.info(f"Attempt {i + 1}/{num_attempts} with key index {current_key_idx}...") | |
| try: | |
| genai.configure(api_key=selected_api_key) | |
| model = genai.GenerativeModel('gemini-2.5-flash') | |
| response = model.generate_content( | |
| [prompt, img], | |
| request_options={"timeout": GEMINI_REQUEST_TIMEOUT} | |
| ) | |
| text_response = response.text | |
| json_start = text_response.find('{') | |
| json_end = text_response.rfind('}') + 1 | |
| if json_start == -1: | |
| raise ValueError("AI response did not contain valid JSON.") | |
| cleaned_response = text_response[json_start:json_end] | |
| result = json.loads(cleaned_response) | |
| logging.info(f"✅ Enhancement success with key index {current_key_idx}.") | |
| return result | |
| except (ResourceExhausted, InvalidArgument) as e: | |
| logging.warning(f"⚠️ Key index {current_key_idx} failed: {type(e).__name__}. Trying next...") | |
| except Exception as e: | |
| error_str = str(e).lower() | |
| if "deadline exceeded" in error_str or "timeout" in error_str or "504" in error_str: | |
| logging.warning(f"⏰ Key index {current_key_idx} timed out. Trying next...") | |
| else: | |
| logging.error(f"❌ Unknown error with key index {current_key_idx}: {e}. Trying next...") | |
| # و همچنین جایگزین کردن این قسمت | |
| user_prompt_text = extract_user_prompt(prompt) | |
| fallback_result = translate_prompt_fallback(user_prompt_text) | |
| return fallback_result |