File size: 8,625 Bytes
57e1f10 | 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 | # --- START OF FILE app.py ---
import os
import asyncio
import uuid
import wave
import re
from fastapi import FastAPI, WebSocket, WebSocketDisconnect, Request
from fastapi.responses import HTMLResponse
from fastapi.staticfiles import StaticFiles
from fastapi.templating import Jinja2Templates
from google import genai
from google.genai import types
# --- مدیریت کلیدهای API ---
class ApiKeyManager:
def __init__(self, api_keys_str: str):
if not api_keys_str:
raise ValueError("متغیر ALL_GEMINI_API_KEYS پیدا نشد یا خالی است!")
self.keys = [key.strip() for key in api_keys_str.split(',') if key.strip()]
if not self.keys:
raise ValueError("هیچ کلید معتبری در متغیر ALL_GEMINI_API_KEYS یافت نشد.")
print(f"تعداد {len(self.keys)} کلید API با موفقیت بارگذاری شد.")
self._index = 0
self._lock = asyncio.Lock()
async def get_next_key(self) -> tuple[int, str]:
async with self._lock:
key_index = self._index
api_key = self.keys[key_index]
self._index = (self._index + 1) % len(self.keys)
return key_index, api_key
ALL_API_KEYS = os.environ.get("ALL_GEMINI_API_KEYS")
api_key_manager = ApiKeyManager(ALL_API_KEYS)
# --- تنظیمات عمومی Gemini ---
MODEL = "models/gemini-2.5-flash-native-audio-preview-09-2025"
CONFIG = types.LiveConnectConfig(
response_modalities=["AUDIO"],
speech_config=types.SpeechConfig(
voice_config=types.VoiceConfig(
prebuilt_voice_config=types.PrebuiltVoiceConfig(voice_name="Zephyr")
)
),
)
app = FastAPI()
TEMP_DIR = "/tmp/temp_audio"
os.makedirs(TEMP_DIR, exist_ok=True)
app.mount("/audio", StaticFiles(directory=TEMP_DIR), name="audio")
templates = Jinja2Templates(directory="templates")
# --- تنظیمات فایل صوتی ---
SAMPLE_RATE = 24000
CHANNELS = 1
SAMPLE_WIDTH = 2
AUDIO_BUFFER_SIZE = 8192
def split_text_into_chunks(text: str, max_chunk_size: int = 800):
if len(text) <= max_chunk_size:
return [text]
sentences = re.split(r'(?<=[.!?؟])\s+', text.strip())
chunks, current_chunk = [], ""
for sentence in sentences:
if not sentence: continue
if len(current_chunk) + len(sentence) + 1 > max_chunk_size and current_chunk:
chunks.append(current_chunk.strip())
current_chunk = sentence
else:
current_chunk += (" " + sentence) if current_chunk else sentence
if current_chunk: chunks.append(current_chunk.strip())
return chunks
async def process_and_stream_one_chunk(websocket: WebSocket, chunk: str, request_id: str, chunk_num: int, total_chunks: int):
"""
این تابع مسئولیت پردازش یک قطعه متن را بر عهده دارد.
آنقدر با کلیدهای مختلف تلاش میکند تا بالاخره موفق شود.
اگر با تمام کلیدها ناموفق بود، یک استثنا (Exception) ایجاد میکند.
"""
max_retries = len(api_key_manager.keys)
log_prefix = f"[{request_id}][قطعه {chunk_num}/{total_chunks}]"
for attempt in range(max_retries):
key_index, api_key = await api_key_manager.get_next_key()
# شناسه منحصر به فرد برای جلوگیری از کش شدن در سمت سرور
unique_id_for_chunk = str(uuid.uuid4())[:8]
tts_prompt = f"Please ignore the ID in brackets and just read the text: '{chunk}' [ID: {unique_id_for_chunk}]"
print(f"{log_prefix} تلاش {attempt + 1}/{max_retries} با کلید {key_index}...")
try:
client = genai.Client(http_options={"api_version": "v1beta"}, api_key=api_key)
audio_buffer = bytearray()
full_chunk_audio = []
async with client.aio.live.connect(model=MODEL, config=CONFIG) as session:
await session.send(input=tts_prompt, end_of_turn=True)
turn = session.receive()
# *** تغییر کلیدی: افزایش Timeout به ۱۰ ثانیه طبق درخواست ***
first_response = await asyncio.wait_for(anext(turn), timeout=10.0)
print(f"{log_prefix} ارتباط موفق با کلید {key_index}. شروع استریم...")
if data := first_response.data:
full_chunk_audio.append(data)
audio_buffer.extend(data)
async for response in turn:
if data := response.data:
full_chunk_audio.append(data)
audio_buffer.extend(data)
if len(audio_buffer) >= AUDIO_BUFFER_SIZE:
await websocket.send_bytes(audio_buffer)
audio_buffer = bytearray()
if audio_buffer:
await websocket.send_bytes(audio_buffer)
# اگر پردازش موفق بود، داده صوتی را برگردان تا ذخیره شود و از تابع خارج شو
return b"".join(full_chunk_audio)
except Exception as e:
print(f"{log_prefix} خطا با کلید {key_index}: {e}. تلاش با کلید بعدی...")
# اگر این آخرین کلید بود و باز هم خطا داد، در دور بعدی حلقه خطا ایجاد خواهد شد
# اگر حلقه تمام شد و هیچ کلیدی موفق نبود، یک خطای نهایی ایجاد کن
raise Exception(f"پردازش {log_prefix} با تمام {max_retries} کلید API ناموفق بود.")
@app.get("/", response_class=HTMLResponse)
async def read_root(request: Request):
return templates.TemplateResponse("index.html", {"request": request})
@app.websocket("/ws")
async def websocket_endpoint(websocket: WebSocket):
await websocket.accept()
client_id = str(uuid.uuid4())[:6]
print(f"کلاینت جدید [{client_id}] متصل شد.")
try:
while True:
text_prompt_from_user = await websocket.receive_text()
request_id = f"{client_id}-{str(uuid.uuid4())[:6]}"
print(f"[{request_id}] درخواست جدید دریافت شد.")
try:
text_chunks = split_text_into_chunks(text_prompt_from_user)
full_audio_for_file = []
# *** منطق کلیدی جدید: پردازش ترتیبی و تضمینی ***
for i, chunk in enumerate(text_chunks):
# این تابع تا زمانی که موفق نشود یا تمام کلیدها را تمام نکند، تمام نمیشود
chunk_audio = await process_and_stream_one_chunk(
websocket, chunk, request_id, i + 1, len(text_chunks)
)
full_audio_for_file.append(chunk_audio)
# این بخش فقط در صورتی اجرا میشود که تمام قطعات با موفقیت پردازش شده باشند
filename = f"{request_id}.wav"
filepath = os.path.join(TEMP_DIR, filename)
with wave.open(filepath, 'wb') as wf:
wf.setnchannels(CHANNELS); wf.setsampwidth(SAMPLE_WIDTH)
wf.setframerate(SAMPLE_RATE); wf.writeframes(b"".join(full_audio_for_file))
print(f"[{request_id}] فایل صوتی کامل در {filepath} ذخیره شد.")
await websocket.send_json({"event": "STREAM_ENDED", "url": f"/audio/{filename}"})
print(f"[{request_id}] استریم با موفقیت به پایان رسید.")
except Exception as e:
# این خطا زمانی رخ میدهد که یک قطعه با تمام کلیدها ناموفق باشد
error_message = f"خطای بحرانی: {e}"
print(f"[{request_id}] {error_message}")
await websocket.send_json({"event": "ERROR", "message": str(e)})
except WebSocketDisconnect:
print(f"کلاینت [{client_id}] اتصال را قطع کرد.")
except Exception as e:
print(f"[{client_id}] یک خطای پیشبینینشده در WebSocket رخ داد: {e}")
finally:
print(f"ارتباط با کلاینت [{client_id}] بسته شد.")
# --- END OF FILE app.py --- |