Ttslive / app.py
Opera8's picture
Create app.py
57e1f10 verified
# --- START OF FILE app.py ---
import os
import asyncio
import uuid
import wave
import re
from fastapi import FastAPI, WebSocket, WebSocketDisconnect, Request
from fastapi.responses import HTMLResponse
from fastapi.staticfiles import StaticFiles
from fastapi.templating import Jinja2Templates
from google import genai
from google.genai import types
# --- مدیریت کلیدهای API ---
class ApiKeyManager:
def __init__(self, api_keys_str: str):
if not api_keys_str:
raise ValueError("متغیر ALL_GEMINI_API_KEYS پیدا نشد یا خالی است!")
self.keys = [key.strip() for key in api_keys_str.split(',') if key.strip()]
if not self.keys:
raise ValueError("هیچ کلید معتبری در متغیر ALL_GEMINI_API_KEYS یافت نشد.")
print(f"تعداد {len(self.keys)} کلید API با موفقیت بارگذاری شد.")
self._index = 0
self._lock = asyncio.Lock()
async def get_next_key(self) -> tuple[int, str]:
async with self._lock:
key_index = self._index
api_key = self.keys[key_index]
self._index = (self._index + 1) % len(self.keys)
return key_index, api_key
ALL_API_KEYS = os.environ.get("ALL_GEMINI_API_KEYS")
api_key_manager = ApiKeyManager(ALL_API_KEYS)
# --- تنظیمات عمومی Gemini ---
MODEL = "models/gemini-2.5-flash-native-audio-preview-09-2025"
CONFIG = types.LiveConnectConfig(
response_modalities=["AUDIO"],
speech_config=types.SpeechConfig(
voice_config=types.VoiceConfig(
prebuilt_voice_config=types.PrebuiltVoiceConfig(voice_name="Zephyr")
)
),
)
app = FastAPI()
TEMP_DIR = "/tmp/temp_audio"
os.makedirs(TEMP_DIR, exist_ok=True)
app.mount("/audio", StaticFiles(directory=TEMP_DIR), name="audio")
templates = Jinja2Templates(directory="templates")
# --- تنظیمات فایل صوتی ---
SAMPLE_RATE = 24000
CHANNELS = 1
SAMPLE_WIDTH = 2
AUDIO_BUFFER_SIZE = 8192
def split_text_into_chunks(text: str, max_chunk_size: int = 800):
if len(text) <= max_chunk_size:
return [text]
sentences = re.split(r'(?<=[.!?؟])\s+', text.strip())
chunks, current_chunk = [], ""
for sentence in sentences:
if not sentence: continue
if len(current_chunk) + len(sentence) + 1 > max_chunk_size and current_chunk:
chunks.append(current_chunk.strip())
current_chunk = sentence
else:
current_chunk += (" " + sentence) if current_chunk else sentence
if current_chunk: chunks.append(current_chunk.strip())
return chunks
async def process_and_stream_one_chunk(websocket: WebSocket, chunk: str, request_id: str, chunk_num: int, total_chunks: int):
"""
این تابع مسئولیت پردازش یک قطعه متن را بر عهده دارد.
آنقدر با کلیدهای مختلف تلاش می‌کند تا بالاخره موفق شود.
اگر با تمام کلیدها ناموفق بود، یک استثنا (Exception) ایجاد می‌کند.
"""
max_retries = len(api_key_manager.keys)
log_prefix = f"[{request_id}][قطعه {chunk_num}/{total_chunks}]"
for attempt in range(max_retries):
key_index, api_key = await api_key_manager.get_next_key()
# شناسه منحصر به فرد برای جلوگیری از کش شدن در سمت سرور
unique_id_for_chunk = str(uuid.uuid4())[:8]
tts_prompt = f"Please ignore the ID in brackets and just read the text: '{chunk}' [ID: {unique_id_for_chunk}]"
print(f"{log_prefix} تلاش {attempt + 1}/{max_retries} با کلید {key_index}...")
try:
client = genai.Client(http_options={"api_version": "v1beta"}, api_key=api_key)
audio_buffer = bytearray()
full_chunk_audio = []
async with client.aio.live.connect(model=MODEL, config=CONFIG) as session:
await session.send(input=tts_prompt, end_of_turn=True)
turn = session.receive()
# *** تغییر کلیدی: افزایش Timeout به ۱۰ ثانیه طبق درخواست ***
first_response = await asyncio.wait_for(anext(turn), timeout=10.0)
print(f"{log_prefix} ارتباط موفق با کلید {key_index}. شروع استریم...")
if data := first_response.data:
full_chunk_audio.append(data)
audio_buffer.extend(data)
async for response in turn:
if data := response.data:
full_chunk_audio.append(data)
audio_buffer.extend(data)
if len(audio_buffer) >= AUDIO_BUFFER_SIZE:
await websocket.send_bytes(audio_buffer)
audio_buffer = bytearray()
if audio_buffer:
await websocket.send_bytes(audio_buffer)
# اگر پردازش موفق بود، داده صوتی را برگردان تا ذخیره شود و از تابع خارج شو
return b"".join(full_chunk_audio)
except Exception as e:
print(f"{log_prefix} خطا با کلید {key_index}: {e}. تلاش با کلید بعدی...")
# اگر این آخرین کلید بود و باز هم خطا داد، در دور بعدی حلقه خطا ایجاد خواهد شد
# اگر حلقه تمام شد و هیچ کلیدی موفق نبود، یک خطای نهایی ایجاد کن
raise Exception(f"پردازش {log_prefix} با تمام {max_retries} کلید API ناموفق بود.")
@app.get("/", response_class=HTMLResponse)
async def read_root(request: Request):
return templates.TemplateResponse("index.html", {"request": request})
@app.websocket("/ws")
async def websocket_endpoint(websocket: WebSocket):
await websocket.accept()
client_id = str(uuid.uuid4())[:6]
print(f"کلاینت جدید [{client_id}] متصل شد.")
try:
while True:
text_prompt_from_user = await websocket.receive_text()
request_id = f"{client_id}-{str(uuid.uuid4())[:6]}"
print(f"[{request_id}] درخواست جدید دریافت شد.")
try:
text_chunks = split_text_into_chunks(text_prompt_from_user)
full_audio_for_file = []
# *** منطق کلیدی جدید: پردازش ترتیبی و تضمینی ***
for i, chunk in enumerate(text_chunks):
# این تابع تا زمانی که موفق نشود یا تمام کلیدها را تمام نکند، تمام نمی‌شود
chunk_audio = await process_and_stream_one_chunk(
websocket, chunk, request_id, i + 1, len(text_chunks)
)
full_audio_for_file.append(chunk_audio)
# این بخش فقط در صورتی اجرا می‌شود که تمام قطعات با موفقیت پردازش شده باشند
filename = f"{request_id}.wav"
filepath = os.path.join(TEMP_DIR, filename)
with wave.open(filepath, 'wb') as wf:
wf.setnchannels(CHANNELS); wf.setsampwidth(SAMPLE_WIDTH)
wf.setframerate(SAMPLE_RATE); wf.writeframes(b"".join(full_audio_for_file))
print(f"[{request_id}] فایل صوتی کامل در {filepath} ذخیره شد.")
await websocket.send_json({"event": "STREAM_ENDED", "url": f"/audio/{filename}"})
print(f"[{request_id}] استریم با موفقیت به پایان رسید.")
except Exception as e:
# این خطا زمانی رخ می‌دهد که یک قطعه با تمام کلیدها ناموفق باشد
error_message = f"خطای بحرانی: {e}"
print(f"[{request_id}] {error_message}")
await websocket.send_json({"event": "ERROR", "message": str(e)})
except WebSocketDisconnect:
print(f"کلاینت [{client_id}] اتصال را قطع کرد.")
except Exception as e:
print(f"[{client_id}] یک خطای پیش‌بینی‌نشده در WebSocket رخ داد: {e}")
finally:
print(f"ارتباط با کلاینت [{client_id}] بسته شد.")
# --- END OF FILE app.py ---