Ttspro4 / app.py
Hamed744's picture
Update app.py
439546f verified
# app.py - نسخه Worker بدون تقسیم متن (No Splitting)
import os
import sys
import traceback
import re
import struct
import time
import uuid
import shutil
import logging
import mimetypes
import threading
import random
import asyncio
import wave
from fastapi import FastAPI, HTTPException
from pydantic import BaseModel
from google import genai
from google.genai import types
import uvicorn
try:
from pydub import AudioSegment
PYDUB_AVAILABLE = True
except ImportError:
PYDUB_AVAILABLE = False
logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s', datefmt='%Y-%m-%d %H:%M:%S')
# --- تنظیمات مدیریت کلیدها ---
GEMINI_CLIENTS_CACHE = {}
CLIENT_CACHE_LOCK = threading.Lock()
ALL_API_KEYS: list[str] = []
def _init_api_keys():
global ALL_API_KEYS
all_keys_string = os.environ.get("ALL_GEMINI_API_KEYS")
if all_keys_string:
ALL_API_KEYS = [key.strip() for key in all_keys_string.split(',') if key.strip()]
logging.info(f"✅ تعداد {len(ALL_API_KEYS)} کلید API جیمینای شناسایی و بارگذاری شد.")
if not ALL_API_KEYS:
logging.warning("⛔️ هشدار: هیچ Secret با نام ALL_GEMINI_API_KEYS یافت نشد!")
def get_random_api_key_and_client():
if not ALL_API_KEYS:
return None, None
key_to_use = random.choice(ALL_API_KEYS)
with CLIENT_CACHE_LOCK:
if key_to_use in GEMINI_CLIENTS_CACHE:
client = GEMINI_CLIENTS_CACHE[key_to_use]
else:
client = genai.Client(api_key=key_to_use)
GEMINI_CLIENTS_CACHE[key_to_use] = client
return key_to_use, client
FIXED_MODEL_NAME_STANDARD = "gemini-2.5-flash-preview-tts"
FIXED_MODEL_NAME_LIVE = "models/gemini-2.5-flash-native-audio-preview-12-2025"
DEFAULT_MAX_CHUNK_SIZE = 3800
DEFAULT_SLEEP_BETWEEN_REQUESTS = 5
def save_binary_file(file_name, data):
try:
with open(file_name, "wb") as f: f.write(data)
return file_name
except Exception as e:
logging.error(f"❌ خطا در ذخیره فایل {file_name}: {e}")
return None
def convert_to_wav(audio_data: bytes, mime_type: str) -> bytes:
parameters = parse_audio_mime_type(mime_type)
bits_per_sample, rate = parameters["bits_per_sample"], parameters["rate"]
num_channels, data_size = 1, len(audio_data)
bytes_per_sample, block_align = bits_per_sample // 8, num_channels * (bits_per_sample // 8)
byte_rate, chunk_size = rate * block_align, 36 + data_size
header = struct.pack("<4sI4s4sIHHIIHH4sI", b"RIFF", chunk_size, b"WAVE", b"fmt ", 16, 1, num_channels, rate, byte_rate, block_align, bits_per_sample, b"data", data_size)
return header + audio_data
def parse_audio_mime_type(mime_type: str) -> dict[str, int]:
bits, rate = 16, 24000
for param in mime_type.split(";"):
param = param.strip()
if param.lower().startswith("rate="):
try: rate = int(param.split("=", 1)[1])
except: pass
elif param.startswith("audio/L"):
try: bits = int(param.split("L", 1)[1])
except: pass
return {"bits_per_sample": bits, "rate": rate}
def smart_text_split(text, max_size=3800):
# تغییر مهم: حذف کامل تقسیم‌بندی متن
# کل متن به عنوان یک تکه بازگردانده می‌شود تا هوش مصنوعی یکجا آن را پردازش کند
return [text]
def merge_audio_files_func(file_paths, output_path):
if not PYDUB_AVAILABLE: logging.warning("⚠️ pydub برای ادغام در دسترس نیست."); return False
try:
combined = AudioSegment.empty()
for i, fp in enumerate(file_paths):
if os.path.exists(fp): combined += AudioSegment.from_file(fp) + (AudioSegment.silent(duration=150) if i < len(file_paths) - 1 else AudioSegment.empty())
else: logging.warning(f"⚠️ فایل برای ادغام پیدا نشد: {fp}")
combined.export(output_path, format="wav")
return True
except Exception as e: logging.error(f"❌ خطا در ادغام فایل‌های صوتی: {e}"); return False
# --- منطق Gemini Live ---
async def generate_audio_live_with_retry(text, prompt, voice, session_id):
MAX_RETRIES = 50
live_config = types.LiveConnectConfig(
response_modalities=["AUDIO"],
speech_config=types.SpeechConfig(
voice_config=types.VoiceConfig(
prebuilt_voice_config=types.PrebuiltVoiceConfig(voice_name=voice)
)
),
)
for attempt in range(MAX_RETRIES):
selected_api_key, _ = get_random_api_key_and_client()
if not selected_api_key: break
client = genai.Client(http_options={"api_version": "v1beta"}, api_key=selected_api_key)
unique_id_for_req = str(uuid.uuid4())[:8]
tts_prompt = f"Please read the following text naturally: '{text}' [ID: {unique_id_for_req}]"
if prompt: tts_prompt = f"With a {prompt} tone, please read: '{text}'"
try:
logging.info(f"[{session_id}] (Live) تلاش {attempt+1} با کلید ...{selected_api_key[-4:]}")
audio_buffer = bytearray()
async with client.aio.live.connect(model=FIXED_MODEL_NAME_LIVE, config=live_config) as session:
await session.send(input=tts_prompt, end_of_turn=True)
async for response in session.receive():
if response.data: audio_buffer.extend(response.data)
if len(audio_buffer) > 0:
logging.info(f"[{session_id}] ✅ (Live) موفقیت‌آمیز.")
return audio_buffer
else: raise Exception("بافر صوتی خالی بود.")
except Exception as e:
logging.warning(f"[{session_id}] ⚠️ (Live) خطا در تلاش {attempt+1}: {e}")
time.sleep(0.5)
return None
def save_pcm_to_wav(pcm_data, output_path):
try:
with wave.open(output_path, 'wb') as wf:
wf.setnchannels(1)
wf.setsampwidth(2)
wf.setframerate(24000)
wf.writeframes(pcm_data)
return True
except Exception as e:
logging.error(f"خطا در تبدیل PCM به WAV: {e}")
return False
# --- منطق Gemini Standard (اصلاح شده با retry_limit) ---
def generate_audio_chunk_standard_with_retry(chunk_text, prompt_text, voice, temp, session_id, retry_limit):
if not ALL_API_KEYS: raise Exception("هیچ کلید API در دسترس نیست.")
# استفاده از محدودیت تعیین شده توسط Manager
MAX_RETRIES = retry_limit
for attempt in range(MAX_RETRIES):
selected_api_key, client = get_random_api_key_and_client()
if not client: break
try:
# logging.info(f"[{session_id}] (Standard) تلاش {attempt+1}/{MAX_RETRIES} با کلید ...{selected_api_key[-4:]}")
final_text = f'{chunk_text}({prompt_text})' if prompt_text and prompt_text.strip() else chunk_text
contents = [types.Content(role="user", parts=[types.Part.from_text(text=final_text)])]
config = types.GenerateContentConfig(temperature=temp, response_modalities=["audio"],
speech_config=types.SpeechConfig(voice_config=types.VoiceConfig(
prebuilt_voice_config=types.PrebuiltVoiceConfig(voice_name=voice))))
response = client.models.generate_content(model=FIXED_MODEL_NAME_STANDARD, contents=contents, config=config)
if response.candidates and response.candidates[0].content and response.candidates[0].content.parts and response.candidates[0].content.parts[0].inline_data:
logging.info(f"[{session_id}] ✅ (Standard) موفقیت در تلاش {attempt+1}.")
return response.candidates[0].content.parts[0].inline_data
except Exception as e:
logging.warning(f"[{session_id}] ⚠️ (Standard) خطا در تلاش {attempt+1}: {e}")
time.sleep(0.5)
return None
def core_generate_audio(text_input, prompt_input, selected_voice, temperature_val, session_id, use_live_model=False, retry_limit=50, fallback_to_live=False):
logging.info(f"[{session_id}] 🚀 شروع: Live={use_live_model}, Retry={retry_limit}, Fallback={fallback_to_live}")
temp_dir = f"temp_{session_id}"
os.makedirs(temp_dir, exist_ok=True)
output_base_name = f"{temp_dir}/audio_session_{session_id}"
final_output_path = f"output_{session_id}.wav"
try:
# 1. اگر دستور مستقیم استفاده از لایف باشد (مثلاً کاربر رایگان)
if use_live_model:
pcm_data = asyncio.run(generate_audio_live_with_retry(text_input, prompt_input, selected_voice, session_id))
if pcm_data and save_pcm_to_wav(pcm_data, final_output_path):
return final_output_path
else:
raise Exception("تولید صدا با مدل لایف ناموفق بود.")
# 2. استفاده از مدل استاندارد
else:
# تقسیم‌بندی هوشمند حذف شده و فقط یک چانک (کل متن) برمی‌گرداند
text_chunks = smart_text_split(text_input, DEFAULT_MAX_CHUNK_SIZE)
generated_files = []
standard_failed = False
for i, chunk in enumerate(text_chunks):
# تلاش با مدل استاندارد به تعداد retry_limit
inline_data = generate_audio_chunk_standard_with_retry(chunk, prompt_input, selected_voice, temperature_val, session_id, retry_limit)
if inline_data:
data_buffer = inline_data.data
ext = mimetypes.guess_extension(inline_data.mime_type) or ".wav"
if "audio/L" in inline_data.mime_type and ext == ".wav":
data_buffer = convert_to_wav(data_buffer, inline_data.mime_type)
if not ext.startswith("."): ext = "." + ext
fpath = save_binary_file(f"{output_base_name}_part{i+1:03d}{ext}", data_buffer)
if fpath: generated_files.append(fpath)
else:
standard_failed = True
break # شکست در تولید یکی از چانک‌ها (در اینجا کل متن)
# 3. بررسی شکست و Fallback
if standard_failed:
if fallback_to_live:
logging.info(f"[{session_id}] 🔄 مدل استاندارد شکست خورد. سوییچ به مدل لایف (Fallback)...")
generated_files = []
# فراخوانی مدل لایف برای کل متن
pcm_data = asyncio.run(generate_audio_live_with_retry(text_input, prompt_input, selected_voice, session_id))
if pcm_data and save_pcm_to_wav(pcm_data, final_output_path):
return final_output_path
else:
raise Exception("هم مدل استاندارد و هم مدل لایف (Fallback) شکست خوردند.")
else:
raise Exception(f"تولید صدا با مدل استاندارد پس از {retry_limit} تلاش ناموفق بود.")
# اگر استاندارد موفق بود، فایل‌ها را ادغام کن (در اینجا معمولاً فقط یک فایل است)
if not generated_files: raise Exception("هیچ فایلی تولید نشد.")
if len(generated_files) > 1:
if PYDUB_AVAILABLE and merge_audio_files_func(generated_files, final_output_path):
pass
else:
shutil.move(generated_files[0], final_output_path)
else:
shutil.move(generated_files[0], final_output_path)
return final_output_path
finally:
if os.path.exists(temp_dir):
shutil.rmtree(temp_dir)
_init_api_keys()
app = FastAPI(title="Alpha TTS Worker API")
class TTSRequest(BaseModel):
text: str
prompt: str | None = ""
speaker: str
temperature: float
use_live_model: bool = False
retry_limit: int = 50 # پارامتر جدید
fallback_to_live: bool = False # پارامتر جدید
@app.post("/generate")
def generate_audio_endpoint(request: TTSRequest):
session_id = str(uuid.uuid4())[:8]
try:
final_path = core_generate_audio(
text_input=request.text,
prompt_input=request.prompt,
selected_voice=request.speaker,
temperature_val=request.temperature,
session_id=session_id,
use_live_model=request.use_live_model,
retry_limit=request.retry_limit,
fallback_to_live=request.fallback_to_live
)
if final_path and os.path.exists(final_path):
from fastapi.responses import FileResponse
return FileResponse(path=final_path, media_type='audio/wav', filename=os.path.basename(final_path), background=shutil.rmtree(os.path.dirname(final_path), ignore_errors=True))
else:
raise HTTPException(status_code=500, detail="خطا در تولید فایل صوتی.")
except Exception as e:
logging.error(f"[{session_id}] ❌ خطا: {e}")
raise HTTPException(status_code=500, detail=str(e))
@app.get("/")
def health_check():
return {"status": "ok", "message": "TTS Worker is running."}
if __name__ == "__main__":
port = int(os.environ.get("PORT", 7860))
uvicorn.run(app, host="0.0.0.0", port=port, reload=False)