| | import os |
| | import json |
| | import random |
| | import asyncio |
| | import re |
| | import fcntl |
| | import time |
| | from datetime import datetime |
| | from concurrent.futures import ThreadPoolExecutor |
| | from fastapi import FastAPI, HTTPException, Request |
| | from fastapi.responses import HTMLResponse, JSONResponse |
| | from pydantic import BaseModel |
| | import google.generativeai as genai |
| | from dotenv import load_dotenv |
| |
|
| | load_dotenv() |
| |
|
| | app = FastAPI() |
| |
|
| | |
| | MAX_FREE_DAILY_REQUESTS = 5 |
| | USAGE_DB_FILE = "usage_data.json" |
| |
|
| | |
| | all_keys_str = os.getenv("ALL_GEMINI_API_KEYS", "") |
| | raw_keys = re.split(r'[,\s\n]+', all_keys_str) |
| |
|
| | GEMINI_KEYS = [] |
| | for k in raw_keys: |
| | clean_key = k.strip().replace('"', '').replace("'", "").replace("\r", "").replace("\n", "") |
| | if len(clean_key) > 30: |
| | GEMINI_KEYS.append(clean_key) |
| |
|
| | if GEMINI_KEYS: |
| | print(f"Loaded {len(GEMINI_KEYS)} keys.") |
| | else: |
| | print("Warning: No valid keys found.") |
| |
|
| | executor = ThreadPoolExecutor(max_workers=10) |
| |
|
| | class IdeaRequest(BaseModel): |
| | idea: str |
| | fingerprint: str |
| | is_premium: bool = False |
| | is_instrumental: bool = False |
| |
|
| | @app.get("/", response_class=HTMLResponse) |
| | async def read_root(): |
| | try: |
| | with open("index.html", "r", encoding="utf-8") as f: |
| | return f.read() |
| | except FileNotFoundError: |
| | return "<h1>Error: index.html not found</h1>" |
| |
|
| | def sync_generate_content(prompt): |
| | max_retries = 3 |
| | if not GEMINI_KEYS: |
| | return None |
| |
|
| | for _ in range(max_retries): |
| | try: |
| | current_key = random.choice(GEMINI_KEYS) |
| | genai.configure(api_key=current_key) |
| | model = genai.GenerativeModel('gemini-2.5-flash') |
| | response = model.generate_content(prompt) |
| | clean_json = response.text.replace("```json", "").replace("```", "").strip() |
| | return json.loads(clean_json) |
| | except Exception: |
| | continue |
| | return None |
| |
|
| | def check_and_increment_usage(ip: str, fingerprint: str): |
| | today = datetime.now().strftime("%Y-%m-%d") |
| | if not os.path.exists(USAGE_DB_FILE): |
| | try: |
| | with open(USAGE_DB_FILE, 'w') as f: |
| | json.dump({}, f) |
| | except OSError: |
| | pass |
| |
|
| | try: |
| | f = open(USAGE_DB_FILE, 'r+') |
| | except FileNotFoundError: |
| | f = open(USAGE_DB_FILE, 'w+') |
| | json.dump({}, f) |
| | f.seek(0) |
| |
|
| | with f: |
| | try: |
| | fcntl.flock(f, fcntl.LOCK_EX) |
| | try: |
| | content = f.read() |
| | data = json.loads(content) if content else {} |
| | except json.JSONDecodeError: |
| | data = {} |
| |
|
| | ip_key = f"ip:{ip}" |
| | fp_key = f"fp:{fingerprint}" |
| | |
| | ip_record = data.get(ip_key, {"date": today, "count": 0}) |
| | if ip_record["date"] != today: |
| | ip_record = {"date": today, "count": 0} |
| | |
| | fp_record = data.get(fp_key, {"date": today, "count": 0}) |
| | if fp_record["date"] != today: |
| | fp_record = {"date": today, "count": 0} |
| |
|
| | if ip_record["count"] >= MAX_FREE_DAILY_REQUESTS: |
| | return False |
| | if fingerprint and fp_record["count"] >= MAX_FREE_DAILY_REQUESTS: |
| | return False |
| |
|
| | ip_record["count"] += 1 |
| | data[ip_key] = ip_record |
| | |
| | if fingerprint: |
| | fp_record["count"] += 1 |
| | data[fp_key] = fp_record |
| |
|
| | f.seek(0) |
| | f.truncate() |
| | json.dump(data, f) |
| | f.flush() |
| | return True |
| |
|
| | finally: |
| | fcntl.flock(f, fcntl.LOCK_UN) |
| |
|
| | @app.post("/api/refine") |
| | async def refine_text(request: IdeaRequest, req: Request): |
| | if not GEMINI_KEYS: |
| | raise HTTPException(status_code=500, detail="کلید API تنظیم نشده است.") |
| |
|
| | if not request.is_premium: |
| | client_ip = req.client.host |
| | client_fp = request.fingerprint |
| | try: |
| | allowed = check_and_increment_usage(client_ip, client_fp) |
| | except Exception: |
| | return JSONResponse(content={"error": "System Error checking limits"}, status_code=500) |
| | |
| | if not allowed: |
| | return JSONResponse(content={"error": "LIMIT_REACHED"}, status_code=429) |
| |
|
| | |
| | instruction = "" |
| | if request.is_instrumental: |
| | instruction = """ |
| | **INSTRUMENTAL MODE ACTIVE:** |
| | - The user wants an instrumental song (NO LYRICS). |
| | - **lyrics field MUST be an empty string ""**. |
| | - Focus entirely on the `music_prompt` to describe the instruments and mood vividly. |
| | """ |
| | else: |
| | instruction = """ |
| | - **Lyrics (Match User's Language):** |
| | - Detect the language of the "User Input". |
| | - Write the lyrics entirely in that detected language. |
| | - Structure tags: [Verse 1], [Chorus], etc. |
| | - "lyrics" field must contain ONLY words to be sung. |
| | """ |
| |
|
| | prompt = f""" |
| | You are a professional songwriter and music producer. |
| | Task: Convert the user's input into a COMPLETE song structure and a music generation prompt. |
| | |
| | {instruction} |
| | |
| | 1. **Music Prompt (English):** Describe the mood, instruments, BPM, and style suitable for an AI music generator. |
| | |
| | Output strictly in JSON format: |
| | {{ |
| | "music_prompt": "YOUR ENGLISH PROMPT HERE", |
| | "lyrics": "YOUR FULL LYRICS HERE (OR EMPTY STRING IF INSTRUMENTAL)" |
| | }} |
| | |
| | User Input: {request.idea} |
| | """ |
| | |
| | loop = asyncio.get_running_loop() |
| | result = await loop.run_in_executor(executor, sync_generate_content, prompt) |
| | |
| | if result: |
| | return JSONResponse(content=result) |
| | else: |
| | return JSONResponse(content={"error": "Server busy. Please try again later."}, status_code=503) |