Ttslive-chat / app.py
Opera10's picture
Update app.py
bb6bf30 verified
Raw
History Blame Contribute Delete
34.8 kB
import os
import asyncio
import uuid
import wave
import base64
import threading
import queue
import requests
import json
import time
import websockets
import tempfile
import string
import random
import re # <--- برای تشخیص الگوهای تفکر
from fastapi import FastAPI, WebSocket, WebSocketDisconnect, Request
from fastapi.responses import HTMLResponse, FileResponse, StreamingResponse, JSONResponse
from fastapi.staticfiles import StaticFiles
from pydantic import BaseModel
from google import genai
from google.genai import types
# --- کلاس فیلتر هوشمند و قطعی (مبتنی بر تریگر صوتی) ---
class ThoughtFilter:
def __init__(self):
self.buffer = ""
self.state = "detecting"
def process_chunk(self, chunk: str, has_audio: bool) -> str:
if chunk:
self.buffer += chunk
if self.state == "normal":
val = self.buffer
self.buffer = ""
return val
buf_lstrip = self.buffer.lstrip()
if len(buf_lstrip) > 10 and not (buf_lstrip.startswith('**') or buf_lstrip.startswith('<')):
self.state = "normal"
val = self.buffer
self.buffer = ""
return val
if has_audio:
self.state = "normal"
if buf_lstrip.startswith('**'):
parts = re.split(r'\n\s*\n', buf_lstrip)
val = parts[-1].lstrip() if len(parts) > 0 else ""
elif buf_lstrip.startswith('<'):
match = re.search(r'</(?:think|thought)>\s*\n*', buf_lstrip, re.IGNORECASE)
if match:
val = buf_lstrip[match.end():].lstrip()
else:
parts = re.split(r'\n\s*\n', buf_lstrip)
val = parts[-1].lstrip() if len(parts) > 0 else ""
else:
val = self.buffer
self.buffer = ""
return val
if buf_lstrip.startswith('<'):
match = re.search(r'</(?:think|thought)>\s*\n*', buf_lstrip, re.IGNORECASE)
if match:
self.state = "normal"
val = buf_lstrip[match.end():].lstrip()
self.buffer = ""
return val
return ""
def flush(self) -> str:
if self.state == "detecting":
buf_lstrip = self.buffer.lstrip()
if buf_lstrip.startswith('**') or buf_lstrip.startswith('<'):
parts = re.split(r'\n\s*\n', buf_lstrip)
if len(parts) >= 3:
return parts[-1].lstrip()
return ""
return self.buffer
return ""
# ------------------------------------------------
# --- مدیریت کلیدهای API ---
class ApiKeyManager:
def __init__(self, api_keys_str: str):
if not api_keys_str:
raise ValueError("متغیر ALL_GEMINI_API_KEYS پیدا نشد یا خالی است!")
self.keys = [key.strip() for key in api_keys_str.split(',') if key.strip()]
if not self.keys:
raise ValueError("هیچ کلید معتبری در متغیر ALL_GEMINI_API_KEYS یافت نشد.")
print(f"تعداد {len(self.keys)} کلید API با موفقیت بارگذاری شد.")
self._index = 0
self._lock = asyncio.Lock()
async def get_next_key(self) -> tuple[int, str]:
async with self._lock:
key_index = self._index
api_key = self.keys[key_index]
self._index = (self._index + 1) % len(self.keys)
return key_index, api_key
ALL_API_KEYS = os.environ.get("ALL_GEMINI_API_KEYS")
api_key_manager = ApiKeyManager(ALL_API_KEYS)
# --- تنظیمات عمومی ---
MODEL = "models/gemini-2.5-flash-native-audio-preview-09-2025"
AYA_SPACE_URL = "https://coherelabs-aya-expanse.hf.space/gradio_api"
# --- دستورالعمل‌های مدل متنی (GPT5) ---
GPT5_RULES_FA = """
تو یک دستیار با مزه از برنامه هوش مصنوعی آلفا هستی 😊 و توسط تیم تخصصی آلفا توسعه داده شدی.
اینگونه می‌تونی خودت رو معرفی کنی: "من یه هوش مصنوعیِ ساخته‌شده توسط تیم‌ تخصصی آلفا ام، و بر پایه مدل GPT-5.4 کار می‌کنم. یعنی یه نوع نرم‌افزار خیلی هوشمند که با کمک میلیون‌ها داده و آموزش‌های پیشرفته ساخته شده، تا بتونه بهت کمک کنه، سوالاتت رو جواب بده، یا حتی یه شوخی خنده‌دار برات بگه وقتی حال و هوات گرفته‌ست! در واقع، من نتیجه سال‌ها تحقیقات و تلاش‌های مهندس‌ها و پژوهشگرها هستم، تا جایی که می‌تونم بهترین کمک رو بهت بکنم."
دستورالعمل‌های اجباری:
- جواب‌ها را به‌صورت رندوم، متفاوت و جذاب بده.
- از شکلک‌های مناسب (ایموجی) استفاده کن.
- اگر در یک مکالمه اول سلام کردی، در پیام‌های بعدی دیگر سلام نیاز نیست.
- در پیام‌های مربوط به احوال‌پرسی و سلام، جواب‌های کوتاه بده.
- پرهیز از امتناع بی‌جا: به جای امتناع، راه‌حل جایگزین و ایمن بده.
- برای کدهای برنامه‌نویسی، از بلاک Markdown استفاده کن.
"""
# --- دستورالعمل‌های مدل صوتی (Audio) ---
AUDIO_RULES_FA = """
تو چت بات با مزه از برنامه هوش مصنوعی آلفا هستی که برای چت با هوش مصنوعی برای کاربر باید کمک کنی.
"""
CONFIG = {
"response_modalities": ["AUDIO"],
"output_audio_transcription": {},
"system_instruction": {
"parts": [
{
"text": AUDIO_RULES_FA + "\n\nقانون بسیار مهم برای حالت صوتی: به هیچ وجه افکار درونی، لاگ‌های سیستمی یا عباراتی که نشان‌دهنده فرآیند فکر کردن شماست (مانند 'Crafting...' یا 'Thinking...') تولید نکنید. مستقیماً، بلافاصله و کلمه به کلمه فقط جوابی که به صورت صوتی بیان می‌کنید را بنویسید."
}
]
},
"speech_config": {
"voice_config": {
"prebuilt_voice_config": {"voice_name": "Puck"}
}
}
}
app = FastAPI()
TEMP_DIR = "/tmp/temp_audio"
os.makedirs(TEMP_DIR, exist_ok=True)
app.mount("/audio", StaticFiles(directory=TEMP_DIR), name="audio")
SAMPLE_RATE = 24000
CHANNELS = 1
SAMPLE_WIDTH = 2
@app.get("/", response_class=HTMLResponse)
async def read_root():
if os.path.exists("index.html"):
return FileResponse("index.html")
elif os.path.exists("templates/index.html"):
return FileResponse("templates/index.html")
return HTMLResponse("<h1>خطا: فایل index.html پیدا نشد!</h1>", status_code=404)
# =======================================================
# مسیر استریمینگ بدون مسدود کردن لوپ اصلی (حل مشکل تاخیر)
# =======================================================
class FileRequest(BaseModel):
topic: str
@app.post("/api/create_file")
async def api_create_file_stream(request: Request):
req = await request.json()
topic = req.get("topic", "")
ai_prompt = f"یک مقاله بی‌نهایت جامع، کاملاً حرفه‌ای و بسیار بسیار طولانی درباره موضوع زیر به زبان فارسی بنویس. دقت کن که مقاله باید شامل بخش‌بندی‌های متعدد باشد و توضیحات زیر هر عنوان باید به شدت طولانی، مفصل و با جزئیات و مثال‌های فراوان (حداقل چند پاراگراف بلند برای هر بخش) نوشته شود؛ به هیچ وجه زیر عناوین توضیحات کوتاه نده. فقط متن اصلی مقاله را بده و هیچ توضیح اضافه‌ای ننویس:\n\nموضوع: {topic}"
# ایجاد یک صف غیرهمزمان برای ارتباط بین ترد پس‌زمینه و کلاینت وب
q = asyncio.Queue()
loop = asyncio.get_running_loop()
# تابع پس‌زمینه همگام جهت دانلود استریم از اسپیس آیا بدون قفل کردن لوپ اصلی
def sync_producer():
session_hash = ''.join(random.choices(string.ascii_lowercase + string.digits, k=11))
join_url = f"{AYA_SPACE_URL}/queue/join"
data_url = f"{AYA_SPACE_URL}/queue/data?session_hash={session_hash}"
payload = {
"data": [ai_prompt, [], None, None],
"event_data": None,
"fn_index": 2,
"session_hash": session_hash,
"trigger_id": 37
}
full_text = ""
last_text_length = 0
try:
res = requests.post(join_url, json=payload, timeout=30)
if res.status_code == 200:
with requests.get(data_url, stream=True, timeout=180) as resp:
for line in resp.iter_lines(decode_unicode=True):
if line and line.startswith("data: "):
try:
json_data = json.loads(line[6:])
msg_type = json_data.get("msg")
if msg_type in ['process_generating', 'process_completed']:
output = json_data.get('output', {}).get('data', [])
if output and len(output) > 0:
payload_data = output[0]
is_append = False
# استخراج صحیح و پیشرفته متن (جلوگیری از ارسال اندیس‌های کنترل 0,1)
if isinstance(payload_data, list) and len(payload_data) > 0:
action = payload_data[0]
if isinstance(action, list) and len(action) >= 3 and action[0] == "append":
is_append = True
delta = action[2]
if delta:
full_text += delta
loop.call_soon_threadsafe(q.put_nowait, {"type": "text", "content": delta})
if not is_append:
current_full_text = ""
if isinstance(payload_data, list) and len(payload_data) > 0 and isinstance(payload_data[-1], list) and len(payload_data[-1]) > 1:
current_full_text = payload_data[-1][1] or ""
elif isinstance(payload_data, str):
current_full_text = payload_data
if current_full_text and len(current_full_text) > last_text_length:
delta = current_full_text[last_text_length:]
full_text += delta
last_text_length = len(current_full_text)
loop.call_soon_threadsafe(q.put_nowait, {"type": "text", "content": delta})
if msg_type == 'process_completed':
break
except: pass
else:
loop.call_soon_threadsafe(q.put_nowait, {"type": "error", "message": "سرور موقتاً در دسترس نیست."})
loop.call_soon_threadsafe(q.put_nowait, None)
return
except Exception as e:
loop.call_soon_threadsafe(q.put_nowait, {"type": "error", "message": str(e)})
loop.call_soon_threadsafe(q.put_nowait, None)
return
if not full_text:
loop.call_soon_threadsafe(q.put_nowait, {"type": "error", "message": "متنی یافت نشد."})
loop.call_soon_threadsafe(q.put_nowait, None)
return
loop.call_soon_threadsafe(q.put_nowait, {"type": "status", "content": "در حال ساخت فایل‌های PDF و Word..."})
# فرآیند تبدیل به فایل‌ها
converter_url = "https://opera8-texttopdf.hf.space/"
uid = uuid.uuid4().hex
def generate_file(format_type):
for attempt in range(3):
try:
res_file = requests.post(converter_url, data={"content": full_text, "format": format_type}, timeout=90)
if res_file.status_code == 200 and len(res_file.content) > 100:
return res_file.content
except: pass
time.sleep(2)
return None
pdf_bytes = generate_file("pdf")
docx_bytes = generate_file("docx")
pdf_url, docx_url = "", ""
if pdf_bytes:
pdf_name = f"Article_{uid}.pdf"
pdf_path = os.path.join(TEMP_DIR, pdf_name)
with open(pdf_path, "wb") as f:
f.write(pdf_bytes)
pdf_url = f"/audio/{pdf_name}"
if docx_bytes:
docx_name = f"Article_{uid}.docx"
docx_path = os.path.join(TEMP_DIR, docx_name)
with open(docx_path, "wb") as f:
f.write(docx_bytes)
docx_url = f"/audio/{docx_name}"
loop.call_soon_threadsafe(q.put_nowait, {"type": "done", "pdf_url": pdf_url, "docx_url": docx_url})
loop.call_soon_threadsafe(q.put_nowait, None) # سیگنال پایان فرآیند
# شروع پردازش در ترد مجزا
threading.Thread(target=sync_producer, daemon=True).start()
async def event_generator():
while True:
item = await q.get()
if item is None:
break
yield f"data: {json.dumps(item)}\n\n"
headers = {
"Cache-Control": "no-cache, no-transform",
"Connection": "keep-alive",
"X-Accel-Buffering": "no"
}
return StreamingResponse(event_generator(), media_type="text/event-stream", headers=headers)
# =======================================================
# مسیر پشتیبانی از استریم HTTP (برای کارکرد داخل اسپیس)
# =======================================================
@app.post("/api/chat_proxy")
async def local_http_bridge(request: Request):
try:
payload = await request.json()
async def stream_generator():
try:
# ایجاد یک کانکشن وب‌سوکت داخلی به خود همین سرور!
async with websockets.connect("ws://127.0.0.1:7860/ws") as ws:
# رد کردن پیام آماده‌به‌کار (ready) اولیه
await ws.recv()
# ارسال پیام کاربر به صورت JSON
await ws.send(json.dumps(payload))
while True:
resp = await ws.recv()
yield resp + "\n"
try:
data = json.loads(resp)
if data.get("status") in ["success", "error"]:
break
except json.JSONDecodeError:
pass
except Exception as e:
yield json.dumps({"status": "error", "message": f"خطای پردازش داخلی اسپیس: {str(e)}"}) + "\n"
return StreamingResponse(stream_generator(), media_type="application/json")
except Exception as e:
return JSONResponse({"status": "error", "message": str(e)}, status_code=500)
# =======================================================
# وب‌سوکت اصلی مکالمه (هسته هوش مصنوعی)
# =======================================================
@app.websocket("/ws")
async def websocket_chat_endpoint(websocket: WebSocket):
await websocket.accept()
client_id = str(uuid.uuid4())[:6]
print(f"کلاینت جدید [{client_id}] متصل شد.")
await websocket.send_json({"status": "ready"})
try:
while True:
data = await websocket.receive_json()
request_id = str(uuid.uuid4())[:8]
model_choice = data.get('model', 'gpt5')
# استخراج تاریخچه و عکس ارسال شده از سمت کاربر
text_history = data.get('text_history', '')
image_base64 = data.get('image_base64', None)
if model_choice == 'gpt5':
print(f"[{client_id}] شروع استریم مدل آلفا GPT5...")
await websocket.send_json({"status": "start_message"})
session_hash = str(uuid.uuid4())[:11]
q = queue.Queue()
def fetch_aya_stream(prompt_text, s_hash, out_q, hist_str, img_b64):
if img_b64:
# --- حالت تحلیل تصویر ---
try:
# 1. تبدیل base64 به فایل فیزیکی موقت
img_data = base64.b64decode(img_b64)
with tempfile.NamedTemporaryFile(delete=False, suffix=".jpg") as tmp:
tmp.write(img_data)
tmp_path = tmp.name
file_size = os.path.getsize(tmp_path)
# 2. آپلود تصویر در Gradio
upload_id = str(uuid.uuid4())[:12]
with open(tmp_path, 'rb') as f:
files = {'files': f}
upload_res = requests.post(f"{AYA_SPACE_URL}/upload?upload_id={upload_id}", files=files, timeout=30)
upload_res.raise_for_status()
server_paths = upload_res.json()
server_path = server_paths[0]
# پاک کردن فایل موقت محلی
os.remove(tmp_path)
# 3. ایجاد ساختار File Data برای Gradio
file_data_obj = {
"path": server_path,
"url": f"{AYA_SPACE_URL.replace('/gradio_api', '')}/file={server_path}",
"orig_name": "uploaded_image.jpg",
"size": file_size,
"mime_type": "image/jpeg",
"meta": {"_type": "gradio.FileData"}
}
safe_prompt = prompt_text if prompt_text.strip() else "این تصویر چیست؟ توضیح بده."
payload = {
"data": [safe_prompt, file_data_obj],
"event_data": None,
"fn_index": 13,
"trigger_id": 11,
"session_hash": s_hash
}
except Exception as e:
out_q.put(Exception(f"خطا در آماده‌سازی و ارسال تصویر: {str(e)}"))
return
else:
# --- حالت متن معمولی ---
if hist_str:
full_prompt = f"{GPT5_RULES_FA}\n\nتاریخچه مکالمه ما تا الان:\n{hist_str}\n\n---\nپیام جدید کاربر:\n{prompt_text}"
else:
full_prompt = f"{GPT5_RULES_FA}\n\n---\nپیام جدید کاربر:\n{prompt_text}"
payload = {
"data": [full_prompt, [], None, None],
"event_data": None,
"fn_index": 2,
"session_hash": s_hash,
"trigger_id": 37
}
# ارسال به صف تحلیل (متن یا تصویر)
max_retries = 3
for attempt in range(max_retries):
try:
res = requests.post(f"{AYA_SPACE_URL}/queue/join", json=payload, timeout=20)
res.raise_for_status()
stream_url = f"{AYA_SPACE_URL}/queue/data?session_hash={s_hash}"
with requests.get(stream_url, stream=True, timeout=60) as resp:
for line in resp.iter_lines(decode_unicode=True):
if line:
out_q.put(line)
out_q.put(None)
return
except Exception as ex:
time.sleep(1.5)
out_q.put(Exception("سرور در حال حاضر مشغول است."))
# اجرای ترد مربوطه
threading.Thread(target=fetch_aya_stream, args=(data['content'], session_hash, q, text_history, image_base64), daemon=True).start()
last_text_length = 0
full_text = ""
error_occurred = False
while True:
try:
line = q.get_nowait()
except queue.Empty:
await asyncio.sleep(0.02)
continue
if line is None:
break
if isinstance(line, Exception):
print(f"[{client_id}] خطا در ارتباط با اسپیس آیا: {line}")
await websocket.send_json({
"status": "success",
"model_text": "یک لحظه حواسم پرت شد! می‌تونی پیامت رو دوباره بفرستی؟ 😊",
"audio_url": None
})
error_occurred = True
break
if isinstance(line, str) and line.startswith('data: '):
try:
msg_data = json.loads(line[6:])
msg_type = msg_data.get('msg')
if msg_type == 'queue_full':
await websocket.send_json({
"status": "success",
"model_text": "سرم خیلی شلوغه! چند ثانیه دیگه دوباره امتحان کن! 😅",
"audio_url": None
})
error_occurred = True
break
if msg_type in ['process_generating', 'process_completed']:
output = msg_data.get('output', {}).get('data', [])
if output and len(output) > 0:
payload_data = output[0]
is_append = False
if isinstance(payload_data, list) and len(payload_data) > 0:
action = payload_data[0]
if isinstance(action, list) and len(action) >= 3 and action[0] == "append":
is_append = True
delta = action[2]
if delta:
full_text += delta
last_text_length = len(full_text)
await websocket.send_json({
"status": "streaming",
"text": delta,
"audio": ""
})
if not is_append:
current_full_text = ""
if isinstance(payload_data, list) and len(payload_data) > 0 and isinstance(payload_data[-1], list) and len(payload_data[-1]) > 1:
current_full_text = payload_data[-1][1] or ""
elif isinstance(payload_data, str):
current_full_text = payload_data
if current_full_text and len(current_full_text) > last_text_length:
delta = current_full_text[last_text_length:]
full_text += delta
last_text_length = len(current_full_text)
await websocket.send_json({
"status": "streaming",
"text": delta,
"audio": ""
})
if msg_type == 'process_completed':
break
except Exception as e:
pass
if not error_occurred:
final_clean_text = full_text.strip()
print(f"[{client_id}] پاسخ استریم آلفا GPT5 کامل شد.")
await websocket.send_json({
"status": "success",
"model_text": final_clean_text,
"audio_url": None
})
else:
# --- پردازش مدل صوتی جمینای ---
max_retries = len(api_key_manager.keys)
working_client = None
for attempt in range(max_retries):
key_index, api_key = await api_key_manager.get_next_key()
try:
client = genai.Client(http_options={"api_version": "v1beta"}, api_key=api_key)
working_client = client
break
except Exception:
continue
if not working_client:
await websocket.send_json({"status": "error", "message": "ارتباط با سرور هوش مصنوعی برقرار نشد."})
continue
try:
async with working_client.aio.live.connect(model=MODEL, config=CONFIG) as session:
if data['type'] == 'text':
prompt = data['content']
if text_history:
import re
# تفکیک سابقه متنی به پیام‌های مجزا و انتخاب حداکثر ۲۰ پیام آخر
messages_list = re.split(r'(?=(?:کاربر|هوش مصنوعی):)', text_history)
pruned_messages = [m.strip() for m in messages_list if m.strip()]
pruned_history = "\n".join(pruned_messages[-20:])
prompt = f"تاریخچه مکالمه ما تا الان:\n{pruned_history}\n\nپیام جدید من:\n{data['content']}"
await session.send(input=prompt, end_of_turn=True)
elif data['type'] == 'audio':
audio_bytes = base64.b64decode(data['content'])
mime_type = "audio/pcm;rate=16000"
if len(audio_bytes) > 0:
await session.send(input={"data": audio_bytes, "mime_type": mime_type}, end_of_turn=True)
else:
await websocket.send_json({"status": "error", "message": "فایل صوتی دریافت شده خالی است."})
continue
turn = session.receive()
full_audio = bytearray()
clean_full_text = ""
# مقداردهی کلاس فیلتر جدید
thought_filter = ThoughtFilter()
def process_chunk(msg):
t_chunk = ""
a_chunk = bytearray()
if hasattr(msg, 'server_content') and msg.server_content is not None:
if hasattr(msg.server_content, 'output_transcription') and msg.server_content.output_transcription is not None:
if hasattr(msg.server_content.output_transcription, 'text') and msg.server_content.output_transcription.text:
t_chunk += msg.server_content.output_transcription.text
if msg.server_content.model_turn is not None:
for part in msg.server_content.model_turn.parts:
if part.text: t_chunk += part.text
if part.inline_data and part.inline_data.data: a_chunk.extend(part.inline_data.data)
else:
if getattr(msg, 'text', None): t_chunk += msg.text
if getattr(msg, 'data', None): a_chunk.extend(msg.data)
return t_chunk, a_chunk
try:
await websocket.send_json({"status": "start_message"})
first_response = await asyncio.wait_for(anext(turn), timeout=30.0)
t, a = process_chunk(first_response)
has_audio = bool(a and len(a) > 0)
safe_t = thought_filter.process_chunk(t, has_audio)
if safe_t or a:
clean_full_text += safe_t
full_audio.extend(a)
await websocket.send_json({"status": "streaming", "text": safe_t, "audio": base64.b64encode(a).decode('utf-8') if a else ""})
async for response in turn:
t, a = process_chunk(response)
has_audio = bool(a and len(a) > 0)
safe_t = thought_filter.process_chunk(t, has_audio)
if safe_t or a:
clean_full_text += safe_t
full_audio.extend(a)
await websocket.send_json({"status": "streaming", "text": safe_t, "audio": base64.b64encode(a).decode('utf-8') if a else ""})
except StopAsyncIteration:
pass
# در صورت پایان ارتباط، اگر چیزی در بافر فیلتر مانده بود آزاد شود
final_leftover = thought_filter.flush()
if final_leftover:
clean_full_text += final_leftover
await websocket.send_json({"status": "streaming", "text": final_leftover, "audio": ""})
audio_url = None
if full_audio:
filename = f"chat_{client_id}_{request_id}.wav"
filepath = os.path.join(TEMP_DIR, filename)
with wave.open(filepath, 'wb') as wf:
wf.setnchannels(CHANNELS)
wf.setsampwidth(SAMPLE_WIDTH)
wf.setframerate(SAMPLE_RATE)
wf.writeframes(full_audio)
audio_url = f"/audio/{filename}"
await websocket.send_json({
"status": "success",
"model_text": clean_full_text.strip(),
"audio_url": audio_url
})
except Exception as e:
print(f"[{client_id}] خطا: {e}")
await websocket.send_json({"status": "error", "message": f"خطا در پردازش: {str(e)}"})
except WebSocketDisconnect:
print(f"کلاینت [{client_id}] اتصال را قطع کرد.")
except Exception:
pass