import os import json import random import asyncio import re import fcntl import time from datetime import datetime from concurrent.futures import ThreadPoolExecutor from fastapi import FastAPI, HTTPException, Request from fastapi.responses import HTMLResponse, JSONResponse from pydantic import BaseModel import google.generativeai as genai from dotenv import load_dotenv load_dotenv() app = FastAPI() # --- تنظیمات محدودیت --- MAX_FREE_DAILY_REQUESTS = 5 USAGE_DB_FILE = "usage_data.json" # --- بخش بارگذاری کلیدها --- all_keys_str = os.getenv("ALL_GEMINI_API_KEYS", "") raw_keys = re.split(r'[,\s\n]+', all_keys_str) GEMINI_KEYS = [] for k in raw_keys: clean_key = k.strip().replace('"', '').replace("'", "").replace("\r", "").replace("\n", "") if len(clean_key) > 30: GEMINI_KEYS.append(clean_key) if GEMINI_KEYS: print(f"Loaded {len(GEMINI_KEYS)} keys.") else: print("Warning: No valid keys found.") executor = ThreadPoolExecutor(max_workers=10) class IdeaRequest(BaseModel): idea: str fingerprint: str is_premium: bool = False is_instrumental: bool = False # فیلد جدید برای حالت بی‌کلام @app.get("/", response_class=HTMLResponse) async def read_root(): try: with open("index.html", "r", encoding="utf-8") as f: return f.read() except FileNotFoundError: return "

Error: index.html not found

" def sync_generate_content(prompt): max_retries = 3 if not GEMINI_KEYS: return None for _ in range(max_retries): try: current_key = random.choice(GEMINI_KEYS) genai.configure(api_key=current_key) model = genai.GenerativeModel('gemini-2.5-flash') response = model.generate_content(prompt) clean_json = response.text.replace("```json", "").replace("```", "").strip() return json.loads(clean_json) except Exception: continue return None def check_and_increment_usage(ip: str, fingerprint: str): today = datetime.now().strftime("%Y-%m-%d") if not os.path.exists(USAGE_DB_FILE): try: with open(USAGE_DB_FILE, 'w') as f: json.dump({}, f) except OSError: pass try: f = open(USAGE_DB_FILE, 'r+') except FileNotFoundError: f = open(USAGE_DB_FILE, 'w+') json.dump({}, f) f.seek(0) with f: try: fcntl.flock(f, fcntl.LOCK_EX) try: content = f.read() data = json.loads(content) if content else {} except json.JSONDecodeError: data = {} ip_key = f"ip:{ip}" fp_key = f"fp:{fingerprint}" ip_record = data.get(ip_key, {"date": today, "count": 0}) if ip_record["date"] != today: ip_record = {"date": today, "count": 0} fp_record = data.get(fp_key, {"date": today, "count": 0}) if fp_record["date"] != today: fp_record = {"date": today, "count": 0} if ip_record["count"] >= MAX_FREE_DAILY_REQUESTS: return False if fingerprint and fp_record["count"] >= MAX_FREE_DAILY_REQUESTS: return False ip_record["count"] += 1 data[ip_key] = ip_record if fingerprint: fp_record["count"] += 1 data[fp_key] = fp_record f.seek(0) f.truncate() json.dump(data, f) f.flush() return True finally: fcntl.flock(f, fcntl.LOCK_UN) @app.post("/api/refine") async def refine_text(request: IdeaRequest, req: Request): if not GEMINI_KEYS: raise HTTPException(status_code=500, detail="کلید API تنظیم نشده است.") if not request.is_premium: client_ip = req.client.host client_fp = request.fingerprint try: allowed = check_and_increment_usage(client_ip, client_fp) except Exception: return JSONResponse(content={"error": "System Error checking limits"}, status_code=500) if not allowed: return JSONResponse(content={"error": "LIMIT_REACHED"}, status_code=429) # لاجیک جدید برای حالت بی‌کلام instruction = "" if request.is_instrumental: instruction = """ **INSTRUMENTAL MODE ACTIVE:** - The user wants an instrumental song (NO LYRICS). - **lyrics field MUST be an empty string ""**. - Focus entirely on the `music_prompt` to describe the instruments and mood vividly. """ else: instruction = """ - **Lyrics (Match User's Language):** - Detect the language of the "User Input". - Write the lyrics entirely in that detected language. - Structure tags: [Verse 1], [Chorus], etc. - "lyrics" field must contain ONLY words to be sung. """ prompt = f""" You are a professional songwriter and music producer. Task: Convert the user's input into a COMPLETE song structure and a music generation prompt. {instruction} 1. **Music Prompt (English):** Describe the mood, instruments, BPM, and style suitable for an AI music generator. Output strictly in JSON format: {{ "music_prompt": "YOUR ENGLISH PROMPT HERE", "lyrics": "YOUR FULL LYRICS HERE (OR EMPTY STRING IF INSTRUMENTAL)" }} User Input: {request.idea} """ loop = asyncio.get_running_loop() result = await loop.run_in_executor(executor, sync_generate_content, prompt) if result: return JSONResponse(content=result) else: return JSONResponse(content={"error": "Server busy. Please try again later."}, status_code=503)