Ahang / app.py
Opera8's picture
Update app.py
70573b7 verified
import os
import json
import random
import asyncio
import re
import fcntl
import time
from datetime import datetime
from concurrent.futures import ThreadPoolExecutor
from fastapi import FastAPI, HTTPException, Request
from fastapi.responses import HTMLResponse, JSONResponse
from pydantic import BaseModel
import google.generativeai as genai
from dotenv import load_dotenv
load_dotenv()
app = FastAPI()
# --- تنظیمات محدودیت ---
MAX_FREE_DAILY_REQUESTS = 5
USAGE_DB_FILE = "usage_data.json"
# --- بخش بارگذاری کلیدها ---
all_keys_str = os.getenv("ALL_GEMINI_API_KEYS", "")
raw_keys = re.split(r'[,\s\n]+', all_keys_str)
GEMINI_KEYS = []
for k in raw_keys:
clean_key = k.strip().replace('"', '').replace("'", "").replace("\r", "").replace("\n", "")
if len(clean_key) > 30:
GEMINI_KEYS.append(clean_key)
if GEMINI_KEYS:
print(f"Loaded {len(GEMINI_KEYS)} keys.")
else:
print("Warning: No valid keys found.")
executor = ThreadPoolExecutor(max_workers=10)
class IdeaRequest(BaseModel):
idea: str
fingerprint: str
is_premium: bool = False
is_instrumental: bool = False # فیلد جدید برای حالت بی‌کلام
@app.get("/", response_class=HTMLResponse)
async def read_root():
try:
with open("index.html", "r", encoding="utf-8") as f:
return f.read()
except FileNotFoundError:
return "<h1>Error: index.html not found</h1>"
def sync_generate_content(prompt):
max_retries = 3
if not GEMINI_KEYS:
return None
for _ in range(max_retries):
try:
current_key = random.choice(GEMINI_KEYS)
genai.configure(api_key=current_key)
model = genai.GenerativeModel('gemini-2.5-flash')
response = model.generate_content(prompt)
clean_json = response.text.replace("```json", "").replace("```", "").strip()
return json.loads(clean_json)
except Exception:
continue
return None
def check_and_increment_usage(ip: str, fingerprint: str):
today = datetime.now().strftime("%Y-%m-%d")
if not os.path.exists(USAGE_DB_FILE):
try:
with open(USAGE_DB_FILE, 'w') as f:
json.dump({}, f)
except OSError:
pass
try:
f = open(USAGE_DB_FILE, 'r+')
except FileNotFoundError:
f = open(USAGE_DB_FILE, 'w+')
json.dump({}, f)
f.seek(0)
with f:
try:
fcntl.flock(f, fcntl.LOCK_EX)
try:
content = f.read()
data = json.loads(content) if content else {}
except json.JSONDecodeError:
data = {}
ip_key = f"ip:{ip}"
fp_key = f"fp:{fingerprint}"
ip_record = data.get(ip_key, {"date": today, "count": 0})
if ip_record["date"] != today:
ip_record = {"date": today, "count": 0}
fp_record = data.get(fp_key, {"date": today, "count": 0})
if fp_record["date"] != today:
fp_record = {"date": today, "count": 0}
if ip_record["count"] >= MAX_FREE_DAILY_REQUESTS:
return False
if fingerprint and fp_record["count"] >= MAX_FREE_DAILY_REQUESTS:
return False
ip_record["count"] += 1
data[ip_key] = ip_record
if fingerprint:
fp_record["count"] += 1
data[fp_key] = fp_record
f.seek(0)
f.truncate()
json.dump(data, f)
f.flush()
return True
finally:
fcntl.flock(f, fcntl.LOCK_UN)
@app.post("/api/refine")
async def refine_text(request: IdeaRequest, req: Request):
if not GEMINI_KEYS:
raise HTTPException(status_code=500, detail="کلید API تنظیم نشده است.")
if not request.is_premium:
client_ip = req.client.host
client_fp = request.fingerprint
try:
allowed = check_and_increment_usage(client_ip, client_fp)
except Exception:
return JSONResponse(content={"error": "System Error checking limits"}, status_code=500)
if not allowed:
return JSONResponse(content={"error": "LIMIT_REACHED"}, status_code=429)
# لاجیک جدید برای حالت بی‌کلام
instruction = ""
if request.is_instrumental:
instruction = """
**INSTRUMENTAL MODE ACTIVE:**
- The user wants an instrumental song (NO LYRICS).
- **lyrics field MUST be an empty string ""**.
- Focus entirely on the `music_prompt` to describe the instruments and mood vividly.
"""
else:
instruction = """
- **Lyrics (Match User's Language):**
- Detect the language of the "User Input".
- Write the lyrics entirely in that detected language.
- Structure tags: [Verse 1], [Chorus], etc.
- "lyrics" field must contain ONLY words to be sung.
"""
prompt = f"""
You are a professional songwriter and music producer.
Task: Convert the user's input into a COMPLETE song structure and a music generation prompt.
{instruction}
1. **Music Prompt (English):** Describe the mood, instruments, BPM, and style suitable for an AI music generator.
Output strictly in JSON format:
{{
"music_prompt": "YOUR ENGLISH PROMPT HERE",
"lyrics": "YOUR FULL LYRICS HERE (OR EMPTY STRING IF INSTRUMENTAL)"
}}
User Input: {request.idea}
"""
loop = asyncio.get_running_loop()
result = await loop.run_in_executor(executor, sync_generate_content, prompt)
if result:
return JSONResponse(content=result)
else:
return JSONResponse(content={"error": "Server busy. Please try again later."}, status_code=503)