import os
import asyncio
import uuid
import wave
import base64
import threading
import queue
import requests
import json
import time
import websockets
import tempfile
import string
import random
import re # <--- برای تشخیص الگوهای تفکر
from fastapi import FastAPI, WebSocket, WebSocketDisconnect, Request
from fastapi.responses import HTMLResponse, FileResponse, StreamingResponse, JSONResponse
from fastapi.staticfiles import StaticFiles
from pydantic import BaseModel
from google import genai
from google.genai import types
# --- کلاس فیلتر هوشمند و قطعی (مبتنی بر تریگر صوتی) ---
class ThoughtFilter:
def __init__(self):
self.buffer = ""
self.state = "detecting"
def process_chunk(self, chunk: str, has_audio: bool) -> str:
if chunk:
self.buffer += chunk
if self.state == "normal":
val = self.buffer
self.buffer = ""
return val
buf_lstrip = self.buffer.lstrip()
if len(buf_lstrip) > 10 and not (buf_lstrip.startswith('**') or buf_lstrip.startswith('<')):
self.state = "normal"
val = self.buffer
self.buffer = ""
return val
if has_audio:
self.state = "normal"
if buf_lstrip.startswith('**'):
parts = re.split(r'\n\s*\n', buf_lstrip)
val = parts[-1].lstrip() if len(parts) > 0 else ""
elif buf_lstrip.startswith('<'):
match = re.search(r'(?:think|thought)>\s*\n*', buf_lstrip, re.IGNORECASE)
if match:
val = buf_lstrip[match.end():].lstrip()
else:
parts = re.split(r'\n\s*\n', buf_lstrip)
val = parts[-1].lstrip() if len(parts) > 0 else ""
else:
val = self.buffer
self.buffer = ""
return val
if buf_lstrip.startswith('<'):
match = re.search(r'(?:think|thought)>\s*\n*', buf_lstrip, re.IGNORECASE)
if match:
self.state = "normal"
val = buf_lstrip[match.end():].lstrip()
self.buffer = ""
return val
return ""
def flush(self) -> str:
if self.state == "detecting":
buf_lstrip = self.buffer.lstrip()
if buf_lstrip.startswith('**') or buf_lstrip.startswith('<'):
parts = re.split(r'\n\s*\n', buf_lstrip)
if len(parts) >= 3:
return parts[-1].lstrip()
return ""
return self.buffer
return ""
# ------------------------------------------------
# --- مدیریت کلیدهای API ---
class ApiKeyManager:
def __init__(self, api_keys_str: str):
if not api_keys_str:
raise ValueError("متغیر ALL_GEMINI_API_KEYS پیدا نشد یا خالی است!")
self.keys = [key.strip() for key in api_keys_str.split(',') if key.strip()]
if not self.keys:
raise ValueError("هیچ کلید معتبری در متغیر ALL_GEMINI_API_KEYS یافت نشد.")
print(f"تعداد {len(self.keys)} کلید API با موفقیت بارگذاری شد.")
self._index = 0
self._lock = asyncio.Lock()
async def get_next_key(self) -> tuple[int, str]:
async with self._lock:
key_index = self._index
api_key = self.keys[key_index]
self._index = (self._index + 1) % len(self.keys)
return key_index, api_key
ALL_API_KEYS = os.environ.get("ALL_GEMINI_API_KEYS")
api_key_manager = ApiKeyManager(ALL_API_KEYS)
# --- تنظیمات عمومی ---
MODEL = "models/gemini-2.5-flash-native-audio-preview-09-2025"
AYA_SPACE_URL = "https://coherelabs-aya-expanse.hf.space/gradio_api"
# --- دستورالعملهای مدل متنی (GPT5) ---
GPT5_RULES_FA = """
تو یک دستیار با مزه از برنامه هوش مصنوعی آلفا هستی 😊 و توسط تیم تخصصی آلفا توسعه داده شدی.
اینگونه میتونی خودت رو معرفی کنی: "من یه هوش مصنوعیِ ساختهشده توسط تیم تخصصی آلفا ام، و بر پایه مدل GPT-5.4 کار میکنم. یعنی یه نوع نرمافزار خیلی هوشمند که با کمک میلیونها داده و آموزشهای پیشرفته ساخته شده، تا بتونه بهت کمک کنه، سوالاتت رو جواب بده، یا حتی یه شوخی خندهدار برات بگه وقتی حال و هوات گرفتهست! در واقع، من نتیجه سالها تحقیقات و تلاشهای مهندسها و پژوهشگرها هستم، تا جایی که میتونم بهترین کمک رو بهت بکنم."
دستورالعملهای اجباری:
- جوابها را بهصورت رندوم، متفاوت و جذاب بده.
- از شکلکهای مناسب (ایموجی) استفاده کن.
- اگر در یک مکالمه اول سلام کردی، در پیامهای بعدی دیگر سلام نیاز نیست.
- در پیامهای مربوط به احوالپرسی و سلام، جوابهای کوتاه بده.
- پرهیز از امتناع بیجا: به جای امتناع، راهحل جایگزین و ایمن بده.
- برای کدهای برنامهنویسی، از بلاک Markdown استفاده کن.
"""
# --- دستورالعملهای مدل صوتی (Audio) ---
AUDIO_RULES_FA = """
تو چت بات با مزه از برنامه هوش مصنوعی آلفا هستی که برای چت با هوش مصنوعی برای کاربر باید کمک کنی.
"""
CONFIG = {
"response_modalities": ["AUDIO"],
"output_audio_transcription": {},
"system_instruction": {
"parts": [
{
"text": AUDIO_RULES_FA + "\n\nقانون بسیار مهم برای حالت صوتی: به هیچ وجه افکار درونی، لاگهای سیستمی یا عباراتی که نشاندهنده فرآیند فکر کردن شماست (مانند 'Crafting...' یا 'Thinking...') تولید نکنید. مستقیماً، بلافاصله و کلمه به کلمه فقط جوابی که به صورت صوتی بیان میکنید را بنویسید."
}
]
},
"speech_config": {
"voice_config": {
"prebuilt_voice_config": {"voice_name": "Puck"}
}
}
}
app = FastAPI()
TEMP_DIR = "/tmp/temp_audio"
os.makedirs(TEMP_DIR, exist_ok=True)
app.mount("/audio", StaticFiles(directory=TEMP_DIR), name="audio")
SAMPLE_RATE = 24000
CHANNELS = 1
SAMPLE_WIDTH = 2
@app.get("/", response_class=HTMLResponse)
async def read_root():
if os.path.exists("index.html"):
return FileResponse("index.html")
elif os.path.exists("templates/index.html"):
return FileResponse("templates/index.html")
return HTMLResponse("
خطا: فایل index.html پیدا نشد!
", status_code=404)
# =======================================================
# مسیر استریمینگ بدون مسدود کردن لوپ اصلی (حل مشکل تاخیر)
# =======================================================
class FileRequest(BaseModel):
topic: str
@app.post("/api/create_file")
async def api_create_file_stream(request: Request):
req = await request.json()
topic = req.get("topic", "")
ai_prompt = f"یک مقاله بینهایت جامع، کاملاً حرفهای و بسیار بسیار طولانی درباره موضوع زیر به زبان فارسی بنویس. دقت کن که مقاله باید شامل بخشبندیهای متعدد باشد و توضیحات زیر هر عنوان باید به شدت طولانی، مفصل و با جزئیات و مثالهای فراوان (حداقل چند پاراگراف بلند برای هر بخش) نوشته شود؛ به هیچ وجه زیر عناوین توضیحات کوتاه نده. فقط متن اصلی مقاله را بده و هیچ توضیح اضافهای ننویس:\n\nموضوع: {topic}"
# ایجاد یک صف غیرهمزمان برای ارتباط بین ترد پسزمینه و کلاینت وب
q = asyncio.Queue()
loop = asyncio.get_running_loop()
# تابع پسزمینه همگام جهت دانلود استریم از اسپیس آیا بدون قفل کردن لوپ اصلی
def sync_producer():
session_hash = ''.join(random.choices(string.ascii_lowercase + string.digits, k=11))
join_url = f"{AYA_SPACE_URL}/queue/join"
data_url = f"{AYA_SPACE_URL}/queue/data?session_hash={session_hash}"
payload = {
"data": [ai_prompt, [], None, None],
"event_data": None,
"fn_index": 2,
"session_hash": session_hash,
"trigger_id": 37
}
full_text = ""
last_text_length = 0
try:
res = requests.post(join_url, json=payload, timeout=30)
if res.status_code == 200:
with requests.get(data_url, stream=True, timeout=180) as resp:
for line in resp.iter_lines(decode_unicode=True):
if line and line.startswith("data: "):
try:
json_data = json.loads(line[6:])
msg_type = json_data.get("msg")
if msg_type in ['process_generating', 'process_completed']:
output = json_data.get('output', {}).get('data', [])
if output and len(output) > 0:
payload_data = output[0]
is_append = False
# استخراج صحیح و پیشرفته متن (جلوگیری از ارسال اندیسهای کنترل 0,1)
if isinstance(payload_data, list) and len(payload_data) > 0:
action = payload_data[0]
if isinstance(action, list) and len(action) >= 3 and action[0] == "append":
is_append = True
delta = action[2]
if delta:
full_text += delta
loop.call_soon_threadsafe(q.put_nowait, {"type": "text", "content": delta})
if not is_append:
current_full_text = ""
if isinstance(payload_data, list) and len(payload_data) > 0 and isinstance(payload_data[-1], list) and len(payload_data[-1]) > 1:
current_full_text = payload_data[-1][1] or ""
elif isinstance(payload_data, str):
current_full_text = payload_data
if current_full_text and len(current_full_text) > last_text_length:
delta = current_full_text[last_text_length:]
full_text += delta
last_text_length = len(current_full_text)
loop.call_soon_threadsafe(q.put_nowait, {"type": "text", "content": delta})
if msg_type == 'process_completed':
break
except: pass
else:
loop.call_soon_threadsafe(q.put_nowait, {"type": "error", "message": "سرور موقتاً در دسترس نیست."})
loop.call_soon_threadsafe(q.put_nowait, None)
return
except Exception as e:
loop.call_soon_threadsafe(q.put_nowait, {"type": "error", "message": str(e)})
loop.call_soon_threadsafe(q.put_nowait, None)
return
if not full_text:
loop.call_soon_threadsafe(q.put_nowait, {"type": "error", "message": "متنی یافت نشد."})
loop.call_soon_threadsafe(q.put_nowait, None)
return
loop.call_soon_threadsafe(q.put_nowait, {"type": "status", "content": "در حال ساخت فایلهای PDF و Word..."})
# فرآیند تبدیل به فایلها
converter_url = "https://opera8-texttopdf.hf.space/"
uid = uuid.uuid4().hex
def generate_file(format_type):
for attempt in range(3):
try:
res_file = requests.post(converter_url, data={"content": full_text, "format": format_type}, timeout=90)
if res_file.status_code == 200 and len(res_file.content) > 100:
return res_file.content
except: pass
time.sleep(2)
return None
pdf_bytes = generate_file("pdf")
docx_bytes = generate_file("docx")
pdf_url, docx_url = "", ""
if pdf_bytes:
pdf_name = f"Article_{uid}.pdf"
pdf_path = os.path.join(TEMP_DIR, pdf_name)
with open(pdf_path, "wb") as f:
f.write(pdf_bytes)
pdf_url = f"/audio/{pdf_name}"
if docx_bytes:
docx_name = f"Article_{uid}.docx"
docx_path = os.path.join(TEMP_DIR, docx_name)
with open(docx_path, "wb") as f:
f.write(docx_bytes)
docx_url = f"/audio/{docx_name}"
loop.call_soon_threadsafe(q.put_nowait, {"type": "done", "pdf_url": pdf_url, "docx_url": docx_url})
loop.call_soon_threadsafe(q.put_nowait, None) # سیگنال پایان فرآیند
# شروع پردازش در ترد مجزا
threading.Thread(target=sync_producer, daemon=True).start()
async def event_generator():
while True:
item = await q.get()
if item is None:
break
yield f"data: {json.dumps(item)}\n\n"
headers = {
"Cache-Control": "no-cache, no-transform",
"Connection": "keep-alive",
"X-Accel-Buffering": "no"
}
return StreamingResponse(event_generator(), media_type="text/event-stream", headers=headers)
# =======================================================
# مسیر پشتیبانی از استریم HTTP (برای کارکرد داخل اسپیس)
# =======================================================
@app.post("/api/chat_proxy")
async def local_http_bridge(request: Request):
try:
payload = await request.json()
async def stream_generator():
try:
# ایجاد یک کانکشن وبسوکت داخلی به خود همین سرور!
async with websockets.connect("ws://127.0.0.1:7860/ws") as ws:
# رد کردن پیام آمادهبهکار (ready) اولیه
await ws.recv()
# ارسال پیام کاربر به صورت JSON
await ws.send(json.dumps(payload))
while True:
resp = await ws.recv()
yield resp + "\n"
try:
data = json.loads(resp)
if data.get("status") in ["success", "error"]:
break
except json.JSONDecodeError:
pass
except Exception as e:
yield json.dumps({"status": "error", "message": f"خطای پردازش داخلی اسپیس: {str(e)}"}) + "\n"
return StreamingResponse(stream_generator(), media_type="application/json")
except Exception as e:
return JSONResponse({"status": "error", "message": str(e)}, status_code=500)
# =======================================================
# وبسوکت اصلی مکالمه (هسته هوش مصنوعی)
# =======================================================
@app.websocket("/ws")
async def websocket_chat_endpoint(websocket: WebSocket):
await websocket.accept()
client_id = str(uuid.uuid4())[:6]
print(f"کلاینت جدید [{client_id}] متصل شد.")
await websocket.send_json({"status": "ready"})
try:
while True:
data = await websocket.receive_json()
request_id = str(uuid.uuid4())[:8]
model_choice = data.get('model', 'gpt5')
# استخراج تاریخچه و عکس ارسال شده از سمت کاربر
text_history = data.get('text_history', '')
image_base64 = data.get('image_base64', None)
if model_choice == 'gpt5':
print(f"[{client_id}] شروع استریم مدل آلفا GPT5...")
await websocket.send_json({"status": "start_message"})
session_hash = str(uuid.uuid4())[:11]
q = queue.Queue()
def fetch_aya_stream(prompt_text, s_hash, out_q, hist_str, img_b64):
if img_b64:
# --- حالت تحلیل تصویر ---
try:
# 1. تبدیل base64 به فایل فیزیکی موقت
img_data = base64.b64decode(img_b64)
with tempfile.NamedTemporaryFile(delete=False, suffix=".jpg") as tmp:
tmp.write(img_data)
tmp_path = tmp.name
file_size = os.path.getsize(tmp_path)
# 2. آپلود تصویر در Gradio
upload_id = str(uuid.uuid4())[:12]
with open(tmp_path, 'rb') as f:
files = {'files': f}
upload_res = requests.post(f"{AYA_SPACE_URL}/upload?upload_id={upload_id}", files=files, timeout=30)
upload_res.raise_for_status()
server_paths = upload_res.json()
server_path = server_paths[0]
# پاک کردن فایل موقت محلی
os.remove(tmp_path)
# 3. ایجاد ساختار File Data برای Gradio
file_data_obj = {
"path": server_path,
"url": f"{AYA_SPACE_URL.replace('/gradio_api', '')}/file={server_path}",
"orig_name": "uploaded_image.jpg",
"size": file_size,
"mime_type": "image/jpeg",
"meta": {"_type": "gradio.FileData"}
}
safe_prompt = prompt_text if prompt_text.strip() else "این تصویر چیست؟ توضیح بده."
payload = {
"data": [safe_prompt, file_data_obj],
"event_data": None,
"fn_index": 13,
"trigger_id": 11,
"session_hash": s_hash
}
except Exception as e:
out_q.put(Exception(f"خطا در آمادهسازی و ارسال تصویر: {str(e)}"))
return
else:
# --- حالت متن معمولی ---
if hist_str:
full_prompt = f"{GPT5_RULES_FA}\n\nتاریخچه مکالمه ما تا الان:\n{hist_str}\n\n---\nپیام جدید کاربر:\n{prompt_text}"
else:
full_prompt = f"{GPT5_RULES_FA}\n\n---\nپیام جدید کاربر:\n{prompt_text}"
payload = {
"data": [full_prompt, [], None, None],
"event_data": None,
"fn_index": 2,
"session_hash": s_hash,
"trigger_id": 37
}
# ارسال به صف تحلیل (متن یا تصویر)
max_retries = 3
for attempt in range(max_retries):
try:
res = requests.post(f"{AYA_SPACE_URL}/queue/join", json=payload, timeout=20)
res.raise_for_status()
stream_url = f"{AYA_SPACE_URL}/queue/data?session_hash={s_hash}"
with requests.get(stream_url, stream=True, timeout=60) as resp:
for line in resp.iter_lines(decode_unicode=True):
if line:
out_q.put(line)
out_q.put(None)
return
except Exception as ex:
time.sleep(1.5)
out_q.put(Exception("سرور در حال حاضر مشغول است."))
# اجرای ترد مربوطه
threading.Thread(target=fetch_aya_stream, args=(data['content'], session_hash, q, text_history, image_base64), daemon=True).start()
last_text_length = 0
full_text = ""
error_occurred = False
while True:
try:
line = q.get_nowait()
except queue.Empty:
await asyncio.sleep(0.02)
continue
if line is None:
break
if isinstance(line, Exception):
print(f"[{client_id}] خطا در ارتباط با اسپیس آیا: {line}")
await websocket.send_json({
"status": "success",
"model_text": "یک لحظه حواسم پرت شد! میتونی پیامت رو دوباره بفرستی؟ 😊",
"audio_url": None
})
error_occurred = True
break
if isinstance(line, str) and line.startswith('data: '):
try:
msg_data = json.loads(line[6:])
msg_type = msg_data.get('msg')
if msg_type == 'queue_full':
await websocket.send_json({
"status": "success",
"model_text": "سرم خیلی شلوغه! چند ثانیه دیگه دوباره امتحان کن! 😅",
"audio_url": None
})
error_occurred = True
break
if msg_type in ['process_generating', 'process_completed']:
output = msg_data.get('output', {}).get('data', [])
if output and len(output) > 0:
payload_data = output[0]
is_append = False
if isinstance(payload_data, list) and len(payload_data) > 0:
action = payload_data[0]
if isinstance(action, list) and len(action) >= 3 and action[0] == "append":
is_append = True
delta = action[2]
if delta:
full_text += delta
last_text_length = len(full_text)
await websocket.send_json({
"status": "streaming",
"text": delta,
"audio": ""
})
if not is_append:
current_full_text = ""
if isinstance(payload_data, list) and len(payload_data) > 0 and isinstance(payload_data[-1], list) and len(payload_data[-1]) > 1:
current_full_text = payload_data[-1][1] or ""
elif isinstance(payload_data, str):
current_full_text = payload_data
if current_full_text and len(current_full_text) > last_text_length:
delta = current_full_text[last_text_length:]
full_text += delta
last_text_length = len(current_full_text)
await websocket.send_json({
"status": "streaming",
"text": delta,
"audio": ""
})
if msg_type == 'process_completed':
break
except Exception as e:
pass
if not error_occurred:
final_clean_text = full_text.strip()
print(f"[{client_id}] پاسخ استریم آلفا GPT5 کامل شد.")
await websocket.send_json({
"status": "success",
"model_text": final_clean_text,
"audio_url": None
})
else:
# --- پردازش مدل صوتی جمینای ---
max_retries = len(api_key_manager.keys)
working_client = None
for attempt in range(max_retries):
key_index, api_key = await api_key_manager.get_next_key()
try:
client = genai.Client(http_options={"api_version": "v1beta"}, api_key=api_key)
working_client = client
break
except Exception:
continue
if not working_client:
await websocket.send_json({"status": "error", "message": "ارتباط با سرور هوش مصنوعی برقرار نشد."})
continue
try:
async with working_client.aio.live.connect(model=MODEL, config=CONFIG) as session:
if data['type'] == 'text':
prompt = data['content']
if text_history:
import re
# تفکیک سابقه متنی به پیامهای مجزا و انتخاب حداکثر ۲۰ پیام آخر
messages_list = re.split(r'(?=(?:کاربر|هوش مصنوعی):)', text_history)
pruned_messages = [m.strip() for m in messages_list if m.strip()]
pruned_history = "\n".join(pruned_messages[-20:])
prompt = f"تاریخچه مکالمه ما تا الان:\n{pruned_history}\n\nپیام جدید من:\n{data['content']}"
await session.send(input=prompt, end_of_turn=True)
elif data['type'] == 'audio':
audio_bytes = base64.b64decode(data['content'])
mime_type = "audio/pcm;rate=16000"
if len(audio_bytes) > 0:
await session.send(input={"data": audio_bytes, "mime_type": mime_type}, end_of_turn=True)
else:
await websocket.send_json({"status": "error", "message": "فایل صوتی دریافت شده خالی است."})
continue
turn = session.receive()
full_audio = bytearray()
clean_full_text = ""
# مقداردهی کلاس فیلتر جدید
thought_filter = ThoughtFilter()
def process_chunk(msg):
t_chunk = ""
a_chunk = bytearray()
if hasattr(msg, 'server_content') and msg.server_content is not None:
if hasattr(msg.server_content, 'output_transcription') and msg.server_content.output_transcription is not None:
if hasattr(msg.server_content.output_transcription, 'text') and msg.server_content.output_transcription.text:
t_chunk += msg.server_content.output_transcription.text
if msg.server_content.model_turn is not None:
for part in msg.server_content.model_turn.parts:
if part.text: t_chunk += part.text
if part.inline_data and part.inline_data.data: a_chunk.extend(part.inline_data.data)
else:
if getattr(msg, 'text', None): t_chunk += msg.text
if getattr(msg, 'data', None): a_chunk.extend(msg.data)
return t_chunk, a_chunk
try:
await websocket.send_json({"status": "start_message"})
first_response = await asyncio.wait_for(anext(turn), timeout=30.0)
t, a = process_chunk(first_response)
has_audio = bool(a and len(a) > 0)
safe_t = thought_filter.process_chunk(t, has_audio)
if safe_t or a:
clean_full_text += safe_t
full_audio.extend(a)
await websocket.send_json({"status": "streaming", "text": safe_t, "audio": base64.b64encode(a).decode('utf-8') if a else ""})
async for response in turn:
t, a = process_chunk(response)
has_audio = bool(a and len(a) > 0)
safe_t = thought_filter.process_chunk(t, has_audio)
if safe_t or a:
clean_full_text += safe_t
full_audio.extend(a)
await websocket.send_json({"status": "streaming", "text": safe_t, "audio": base64.b64encode(a).decode('utf-8') if a else ""})
except StopAsyncIteration:
pass
# در صورت پایان ارتباط، اگر چیزی در بافر فیلتر مانده بود آزاد شود
final_leftover = thought_filter.flush()
if final_leftover:
clean_full_text += final_leftover
await websocket.send_json({"status": "streaming", "text": final_leftover, "audio": ""})
audio_url = None
if full_audio:
filename = f"chat_{client_id}_{request_id}.wav"
filepath = os.path.join(TEMP_DIR, filename)
with wave.open(filepath, 'wb') as wf:
wf.setnchannels(CHANNELS)
wf.setsampwidth(SAMPLE_WIDTH)
wf.setframerate(SAMPLE_RATE)
wf.writeframes(full_audio)
audio_url = f"/audio/{filename}"
await websocket.send_json({
"status": "success",
"model_text": clean_full_text.strip(),
"audio_url": audio_url
})
except Exception as e:
print(f"[{client_id}] خطا: {e}")
await websocket.send_json({"status": "error", "message": f"خطا در پردازش: {str(e)}"})
except WebSocketDisconnect:
print(f"کلاینت [{client_id}] اتصال را قطع کرد.")
except Exception:
pass