File size: 5,913 Bytes
51950d9 2a58068 0c994eb ba2d382 70573b7 8fee1a9 b7c77a2 0c994eb b7c77a2 de02d2c 0c994eb 51950d9 de02d2c 51950d9 c939581 b7c77a2 70573b7 b7c77a2 c939581 0c994eb ba2d382 0c994eb c939581 0c994eb c939581 0c994eb 2a58068 c939581 70573b7 51950d9 de02d2c 573d3ae de02d2c 0c994eb 70573b7 ba2d382 0c994eb c939581 0c994eb c939581 0c994eb c939581 b7c77a2 c939581 8fee1a9 70573b7 c939581 8fee1a9 c939581 8fee1a9 c939581 70573b7 c939581 70573b7 c939581 70573b7 c939581 b7c77a2 f522a40 b7c77a2 0c994eb b7c77a2 c939581 8fee1a9 70573b7 8fee1a9 b7c77a2 c939581 70573b7 b7c77a2 0c994eb 70573b7 0c994eb 573d3ae 0c994eb 70573b7 0c994eb 1ee66d8 0c994eb 1ee66d8 c939581 | 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 | import os
import json
import random
import asyncio
import re
import fcntl
import time
from datetime import datetime
from concurrent.futures import ThreadPoolExecutor
from fastapi import FastAPI, HTTPException, Request
from fastapi.responses import HTMLResponse, JSONResponse
from pydantic import BaseModel
import google.generativeai as genai
from dotenv import load_dotenv
load_dotenv()
app = FastAPI()
# --- تنظیمات محدودیت ---
MAX_FREE_DAILY_REQUESTS = 5
USAGE_DB_FILE = "usage_data.json"
# --- بخش بارگذاری کلیدها ---
all_keys_str = os.getenv("ALL_GEMINI_API_KEYS", "")
raw_keys = re.split(r'[,\s\n]+', all_keys_str)
GEMINI_KEYS = []
for k in raw_keys:
clean_key = k.strip().replace('"', '').replace("'", "").replace("\r", "").replace("\n", "")
if len(clean_key) > 30:
GEMINI_KEYS.append(clean_key)
if GEMINI_KEYS:
print(f"Loaded {len(GEMINI_KEYS)} keys.")
else:
print("Warning: No valid keys found.")
executor = ThreadPoolExecutor(max_workers=10)
class IdeaRequest(BaseModel):
idea: str
fingerprint: str
is_premium: bool = False
is_instrumental: bool = False # فیلد جدید برای حالت بیکلام
@app.get("/", response_class=HTMLResponse)
async def read_root():
try:
with open("index.html", "r", encoding="utf-8") as f:
return f.read()
except FileNotFoundError:
return "<h1>Error: index.html not found</h1>"
def sync_generate_content(prompt):
max_retries = 3
if not GEMINI_KEYS:
return None
for _ in range(max_retries):
try:
current_key = random.choice(GEMINI_KEYS)
genai.configure(api_key=current_key)
model = genai.GenerativeModel('gemini-2.5-flash')
response = model.generate_content(prompt)
clean_json = response.text.replace("```json", "").replace("```", "").strip()
return json.loads(clean_json)
except Exception:
continue
return None
def check_and_increment_usage(ip: str, fingerprint: str):
today = datetime.now().strftime("%Y-%m-%d")
if not os.path.exists(USAGE_DB_FILE):
try:
with open(USAGE_DB_FILE, 'w') as f:
json.dump({}, f)
except OSError:
pass
try:
f = open(USAGE_DB_FILE, 'r+')
except FileNotFoundError:
f = open(USAGE_DB_FILE, 'w+')
json.dump({}, f)
f.seek(0)
with f:
try:
fcntl.flock(f, fcntl.LOCK_EX)
try:
content = f.read()
data = json.loads(content) if content else {}
except json.JSONDecodeError:
data = {}
ip_key = f"ip:{ip}"
fp_key = f"fp:{fingerprint}"
ip_record = data.get(ip_key, {"date": today, "count": 0})
if ip_record["date"] != today:
ip_record = {"date": today, "count": 0}
fp_record = data.get(fp_key, {"date": today, "count": 0})
if fp_record["date"] != today:
fp_record = {"date": today, "count": 0}
if ip_record["count"] >= MAX_FREE_DAILY_REQUESTS:
return False
if fingerprint and fp_record["count"] >= MAX_FREE_DAILY_REQUESTS:
return False
ip_record["count"] += 1
data[ip_key] = ip_record
if fingerprint:
fp_record["count"] += 1
data[fp_key] = fp_record
f.seek(0)
f.truncate()
json.dump(data, f)
f.flush()
return True
finally:
fcntl.flock(f, fcntl.LOCK_UN)
@app.post("/api/refine")
async def refine_text(request: IdeaRequest, req: Request):
if not GEMINI_KEYS:
raise HTTPException(status_code=500, detail="کلید API تنظیم نشده است.")
if not request.is_premium:
client_ip = req.client.host
client_fp = request.fingerprint
try:
allowed = check_and_increment_usage(client_ip, client_fp)
except Exception:
return JSONResponse(content={"error": "System Error checking limits"}, status_code=500)
if not allowed:
return JSONResponse(content={"error": "LIMIT_REACHED"}, status_code=429)
# لاجیک جدید برای حالت بیکلام
instruction = ""
if request.is_instrumental:
instruction = """
**INSTRUMENTAL MODE ACTIVE:**
- The user wants an instrumental song (NO LYRICS).
- **lyrics field MUST be an empty string ""**.
- Focus entirely on the `music_prompt` to describe the instruments and mood vividly.
"""
else:
instruction = """
- **Lyrics (Match User's Language):**
- Detect the language of the "User Input".
- Write the lyrics entirely in that detected language.
- Structure tags: [Verse 1], [Chorus], etc.
- "lyrics" field must contain ONLY words to be sung.
"""
prompt = f"""
You are a professional songwriter and music producer.
Task: Convert the user's input into a COMPLETE song structure and a music generation prompt.
{instruction}
1. **Music Prompt (English):** Describe the mood, instruments, BPM, and style suitable for an AI music generator.
Output strictly in JSON format:
{{
"music_prompt": "YOUR ENGLISH PROMPT HERE",
"lyrics": "YOUR FULL LYRICS HERE (OR EMPTY STRING IF INSTRUMENTAL)"
}}
User Input: {request.idea}
"""
loop = asyncio.get_running_loop()
result = await loop.run_in_executor(executor, sync_generate_content, prompt)
if result:
return JSONResponse(content=result)
else:
return JSONResponse(content={"error": "Server busy. Please try again later."}, status_code=503) |