import os import asyncio import uuid import wave import base64 import threading import queue import requests import json import time import websockets import tempfile import string import random import re # <--- برای تشخیص الگوهای تفکر from fastapi import FastAPI, WebSocket, WebSocketDisconnect, Request from fastapi.responses import HTMLResponse, FileResponse, StreamingResponse, JSONResponse from fastapi.staticfiles import StaticFiles from pydantic import BaseModel from google import genai from google.genai import types # --- کلاس فیلتر هوشمند و قطعی (مبتنی بر تریگر صوتی) --- class ThoughtFilter: def __init__(self): self.buffer = "" self.state = "detecting" def process_chunk(self, chunk: str, has_audio: bool) -> str: if chunk: self.buffer += chunk if self.state == "normal": val = self.buffer self.buffer = "" return val buf_lstrip = self.buffer.lstrip() if len(buf_lstrip) > 10 and not (buf_lstrip.startswith('**') or buf_lstrip.startswith('<')): self.state = "normal" val = self.buffer self.buffer = "" return val if has_audio: self.state = "normal" if buf_lstrip.startswith('**'): parts = re.split(r'\n\s*\n', buf_lstrip) val = parts[-1].lstrip() if len(parts) > 0 else "" elif buf_lstrip.startswith('<'): match = re.search(r'\s*\n*', buf_lstrip, re.IGNORECASE) if match: val = buf_lstrip[match.end():].lstrip() else: parts = re.split(r'\n\s*\n', buf_lstrip) val = parts[-1].lstrip() if len(parts) > 0 else "" else: val = self.buffer self.buffer = "" return val if buf_lstrip.startswith('<'): match = re.search(r'\s*\n*', buf_lstrip, re.IGNORECASE) if match: self.state = "normal" val = buf_lstrip[match.end():].lstrip() self.buffer = "" return val return "" def flush(self) -> str: if self.state == "detecting": buf_lstrip = self.buffer.lstrip() if buf_lstrip.startswith('**') or buf_lstrip.startswith('<'): parts = re.split(r'\n\s*\n', buf_lstrip) if len(parts) >= 3: return parts[-1].lstrip() return "" return self.buffer return "" # ------------------------------------------------ # --- مدیریت کلیدهای API --- class ApiKeyManager: def __init__(self, api_keys_str: str): if not api_keys_str: raise ValueError("متغیر ALL_GEMINI_API_KEYS پیدا نشد یا خالی است!") self.keys = [key.strip() for key in api_keys_str.split(',') if key.strip()] if not self.keys: raise ValueError("هیچ کلید معتبری در متغیر ALL_GEMINI_API_KEYS یافت نشد.") print(f"تعداد {len(self.keys)} کلید API با موفقیت بارگذاری شد.") self._index = 0 self._lock = asyncio.Lock() async def get_next_key(self) -> tuple[int, str]: async with self._lock: key_index = self._index api_key = self.keys[key_index] self._index = (self._index + 1) % len(self.keys) return key_index, api_key ALL_API_KEYS = os.environ.get("ALL_GEMINI_API_KEYS") api_key_manager = ApiKeyManager(ALL_API_KEYS) # --- تنظیمات عمومی --- MODEL = "models/gemini-2.5-flash-native-audio-preview-09-2025" AYA_SPACE_URL = "https://coherelabs-aya-expanse.hf.space/gradio_api" # --- دستورالعمل‌های مدل متنی (GPT5) --- GPT5_RULES_FA = """ تو یک دستیار با مزه از برنامه هوش مصنوعی آلفا هستی 😊 و توسط تیم تخصصی آلفا توسعه داده شدی. اینگونه می‌تونی خودت رو معرفی کنی: "من یه هوش مصنوعیِ ساخته‌شده توسط تیم‌ تخصصی آلفا ام، و بر پایه مدل GPT-5.4 کار می‌کنم. یعنی یه نوع نرم‌افزار خیلی هوشمند که با کمک میلیون‌ها داده و آموزش‌های پیشرفته ساخته شده، تا بتونه بهت کمک کنه، سوالاتت رو جواب بده، یا حتی یه شوخی خنده‌دار برات بگه وقتی حال و هوات گرفته‌ست! در واقع، من نتیجه سال‌ها تحقیقات و تلاش‌های مهندس‌ها و پژوهشگرها هستم، تا جایی که می‌تونم بهترین کمک رو بهت بکنم." دستورالعمل‌های اجباری: - جواب‌ها را به‌صورت رندوم، متفاوت و جذاب بده. - از شکلک‌های مناسب (ایموجی) استفاده کن. - اگر در یک مکالمه اول سلام کردی، در پیام‌های بعدی دیگر سلام نیاز نیست. - در پیام‌های مربوط به احوال‌پرسی و سلام، جواب‌های کوتاه بده. - پرهیز از امتناع بی‌جا: به جای امتناع، راه‌حل جایگزین و ایمن بده. - برای کدهای برنامه‌نویسی، از بلاک Markdown استفاده کن. """ # --- دستورالعمل‌های مدل صوتی (Audio) --- AUDIO_RULES_FA = """ تو چت بات با مزه از برنامه هوش مصنوعی آلفا هستی که برای چت با هوش مصنوعی برای کاربر باید کمک کنی. """ CONFIG = { "response_modalities": ["AUDIO"], "output_audio_transcription": {}, "system_instruction": { "parts": [ { "text": AUDIO_RULES_FA + "\n\nقانون بسیار مهم برای حالت صوتی: به هیچ وجه افکار درونی، لاگ‌های سیستمی یا عباراتی که نشان‌دهنده فرآیند فکر کردن شماست (مانند 'Crafting...' یا 'Thinking...') تولید نکنید. مستقیماً، بلافاصله و کلمه به کلمه فقط جوابی که به صورت صوتی بیان می‌کنید را بنویسید." } ] }, "speech_config": { "voice_config": { "prebuilt_voice_config": {"voice_name": "Puck"} } } } app = FastAPI() TEMP_DIR = "/tmp/temp_audio" os.makedirs(TEMP_DIR, exist_ok=True) app.mount("/audio", StaticFiles(directory=TEMP_DIR), name="audio") SAMPLE_RATE = 24000 CHANNELS = 1 SAMPLE_WIDTH = 2 @app.get("/", response_class=HTMLResponse) async def read_root(): if os.path.exists("index.html"): return FileResponse("index.html") elif os.path.exists("templates/index.html"): return FileResponse("templates/index.html") return HTMLResponse("

خطا: فایل index.html پیدا نشد!

", status_code=404) # ======================================================= # مسیر استریمینگ بدون مسدود کردن لوپ اصلی (حل مشکل تاخیر) # ======================================================= class FileRequest(BaseModel): topic: str @app.post("/api/create_file") async def api_create_file_stream(request: Request): req = await request.json() topic = req.get("topic", "") ai_prompt = f"یک مقاله بی‌نهایت جامع، کاملاً حرفه‌ای و بسیار بسیار طولانی درباره موضوع زیر به زبان فارسی بنویس. دقت کن که مقاله باید شامل بخش‌بندی‌های متعدد باشد و توضیحات زیر هر عنوان باید به شدت طولانی، مفصل و با جزئیات و مثال‌های فراوان (حداقل چند پاراگراف بلند برای هر بخش) نوشته شود؛ به هیچ وجه زیر عناوین توضیحات کوتاه نده. فقط متن اصلی مقاله را بده و هیچ توضیح اضافه‌ای ننویس:\n\nموضوع: {topic}" # ایجاد یک صف غیرهمزمان برای ارتباط بین ترد پس‌زمینه و کلاینت وب q = asyncio.Queue() loop = asyncio.get_running_loop() # تابع پس‌زمینه همگام جهت دانلود استریم از اسپیس آیا بدون قفل کردن لوپ اصلی def sync_producer(): session_hash = ''.join(random.choices(string.ascii_lowercase + string.digits, k=11)) join_url = f"{AYA_SPACE_URL}/queue/join" data_url = f"{AYA_SPACE_URL}/queue/data?session_hash={session_hash}" payload = { "data": [ai_prompt, [], None, None], "event_data": None, "fn_index": 2, "session_hash": session_hash, "trigger_id": 37 } full_text = "" last_text_length = 0 try: res = requests.post(join_url, json=payload, timeout=30) if res.status_code == 200: with requests.get(data_url, stream=True, timeout=180) as resp: for line in resp.iter_lines(decode_unicode=True): if line and line.startswith("data: "): try: json_data = json.loads(line[6:]) msg_type = json_data.get("msg") if msg_type in ['process_generating', 'process_completed']: output = json_data.get('output', {}).get('data', []) if output and len(output) > 0: payload_data = output[0] is_append = False # استخراج صحیح و پیشرفته متن (جلوگیری از ارسال اندیس‌های کنترل 0,1) if isinstance(payload_data, list) and len(payload_data) > 0: action = payload_data[0] if isinstance(action, list) and len(action) >= 3 and action[0] == "append": is_append = True delta = action[2] if delta: full_text += delta loop.call_soon_threadsafe(q.put_nowait, {"type": "text", "content": delta}) if not is_append: current_full_text = "" if isinstance(payload_data, list) and len(payload_data) > 0 and isinstance(payload_data[-1], list) and len(payload_data[-1]) > 1: current_full_text = payload_data[-1][1] or "" elif isinstance(payload_data, str): current_full_text = payload_data if current_full_text and len(current_full_text) > last_text_length: delta = current_full_text[last_text_length:] full_text += delta last_text_length = len(current_full_text) loop.call_soon_threadsafe(q.put_nowait, {"type": "text", "content": delta}) if msg_type == 'process_completed': break except: pass else: loop.call_soon_threadsafe(q.put_nowait, {"type": "error", "message": "سرور موقتاً در دسترس نیست."}) loop.call_soon_threadsafe(q.put_nowait, None) return except Exception as e: loop.call_soon_threadsafe(q.put_nowait, {"type": "error", "message": str(e)}) loop.call_soon_threadsafe(q.put_nowait, None) return if not full_text: loop.call_soon_threadsafe(q.put_nowait, {"type": "error", "message": "متنی یافت نشد."}) loop.call_soon_threadsafe(q.put_nowait, None) return loop.call_soon_threadsafe(q.put_nowait, {"type": "status", "content": "در حال ساخت فایل‌های PDF و Word..."}) # فرآیند تبدیل به فایل‌ها converter_url = "https://opera8-texttopdf.hf.space/" uid = uuid.uuid4().hex def generate_file(format_type): for attempt in range(3): try: res_file = requests.post(converter_url, data={"content": full_text, "format": format_type}, timeout=90) if res_file.status_code == 200 and len(res_file.content) > 100: return res_file.content except: pass time.sleep(2) return None pdf_bytes = generate_file("pdf") docx_bytes = generate_file("docx") pdf_url, docx_url = "", "" if pdf_bytes: pdf_name = f"Article_{uid}.pdf" pdf_path = os.path.join(TEMP_DIR, pdf_name) with open(pdf_path, "wb") as f: f.write(pdf_bytes) pdf_url = f"/audio/{pdf_name}" if docx_bytes: docx_name = f"Article_{uid}.docx" docx_path = os.path.join(TEMP_DIR, docx_name) with open(docx_path, "wb") as f: f.write(docx_bytes) docx_url = f"/audio/{docx_name}" loop.call_soon_threadsafe(q.put_nowait, {"type": "done", "pdf_url": pdf_url, "docx_url": docx_url}) loop.call_soon_threadsafe(q.put_nowait, None) # سیگنال پایان فرآیند # شروع پردازش در ترد مجزا threading.Thread(target=sync_producer, daemon=True).start() async def event_generator(): while True: item = await q.get() if item is None: break yield f"data: {json.dumps(item)}\n\n" headers = { "Cache-Control": "no-cache, no-transform", "Connection": "keep-alive", "X-Accel-Buffering": "no" } return StreamingResponse(event_generator(), media_type="text/event-stream", headers=headers) # ======================================================= # مسیر پشتیبانی از استریم HTTP (برای کارکرد داخل اسپیس) # ======================================================= @app.post("/api/chat_proxy") async def local_http_bridge(request: Request): try: payload = await request.json() async def stream_generator(): try: # ایجاد یک کانکشن وب‌سوکت داخلی به خود همین سرور! async with websockets.connect("ws://127.0.0.1:7860/ws") as ws: # رد کردن پیام آماده‌به‌کار (ready) اولیه await ws.recv() # ارسال پیام کاربر به صورت JSON await ws.send(json.dumps(payload)) while True: resp = await ws.recv() yield resp + "\n" try: data = json.loads(resp) if data.get("status") in ["success", "error"]: break except json.JSONDecodeError: pass except Exception as e: yield json.dumps({"status": "error", "message": f"خطای پردازش داخلی اسپیس: {str(e)}"}) + "\n" return StreamingResponse(stream_generator(), media_type="application/json") except Exception as e: return JSONResponse({"status": "error", "message": str(e)}, status_code=500) # ======================================================= # وب‌سوکت اصلی مکالمه (هسته هوش مصنوعی) # ======================================================= @app.websocket("/ws") async def websocket_chat_endpoint(websocket: WebSocket): await websocket.accept() client_id = str(uuid.uuid4())[:6] print(f"کلاینت جدید [{client_id}] متصل شد.") await websocket.send_json({"status": "ready"}) try: while True: data = await websocket.receive_json() request_id = str(uuid.uuid4())[:8] model_choice = data.get('model', 'gpt5') # استخراج تاریخچه و عکس ارسال شده از سمت کاربر text_history = data.get('text_history', '') image_base64 = data.get('image_base64', None) if model_choice == 'gpt5': print(f"[{client_id}] شروع استریم مدل آلفا GPT5...") await websocket.send_json({"status": "start_message"}) session_hash = str(uuid.uuid4())[:11] q = queue.Queue() def fetch_aya_stream(prompt_text, s_hash, out_q, hist_str, img_b64): if img_b64: # --- حالت تحلیل تصویر --- try: # 1. تبدیل base64 به فایل فیزیکی موقت img_data = base64.b64decode(img_b64) with tempfile.NamedTemporaryFile(delete=False, suffix=".jpg") as tmp: tmp.write(img_data) tmp_path = tmp.name file_size = os.path.getsize(tmp_path) # 2. آپلود تصویر در Gradio upload_id = str(uuid.uuid4())[:12] with open(tmp_path, 'rb') as f: files = {'files': f} upload_res = requests.post(f"{AYA_SPACE_URL}/upload?upload_id={upload_id}", files=files, timeout=30) upload_res.raise_for_status() server_paths = upload_res.json() server_path = server_paths[0] # پاک کردن فایل موقت محلی os.remove(tmp_path) # 3. ایجاد ساختار File Data برای Gradio file_data_obj = { "path": server_path, "url": f"{AYA_SPACE_URL.replace('/gradio_api', '')}/file={server_path}", "orig_name": "uploaded_image.jpg", "size": file_size, "mime_type": "image/jpeg", "meta": {"_type": "gradio.FileData"} } safe_prompt = prompt_text if prompt_text.strip() else "این تصویر چیست؟ توضیح بده." payload = { "data": [safe_prompt, file_data_obj], "event_data": None, "fn_index": 13, "trigger_id": 11, "session_hash": s_hash } except Exception as e: out_q.put(Exception(f"خطا در آماده‌سازی و ارسال تصویر: {str(e)}")) return else: # --- حالت متن معمولی --- if hist_str: full_prompt = f"{GPT5_RULES_FA}\n\nتاریخچه مکالمه ما تا الان:\n{hist_str}\n\n---\nپیام جدید کاربر:\n{prompt_text}" else: full_prompt = f"{GPT5_RULES_FA}\n\n---\nپیام جدید کاربر:\n{prompt_text}" payload = { "data": [full_prompt, [], None, None], "event_data": None, "fn_index": 2, "session_hash": s_hash, "trigger_id": 37 } # ارسال به صف تحلیل (متن یا تصویر) max_retries = 3 for attempt in range(max_retries): try: res = requests.post(f"{AYA_SPACE_URL}/queue/join", json=payload, timeout=20) res.raise_for_status() stream_url = f"{AYA_SPACE_URL}/queue/data?session_hash={s_hash}" with requests.get(stream_url, stream=True, timeout=60) as resp: for line in resp.iter_lines(decode_unicode=True): if line: out_q.put(line) out_q.put(None) return except Exception as ex: time.sleep(1.5) out_q.put(Exception("سرور در حال حاضر مشغول است.")) # اجرای ترد مربوطه threading.Thread(target=fetch_aya_stream, args=(data['content'], session_hash, q, text_history, image_base64), daemon=True).start() last_text_length = 0 full_text = "" error_occurred = False while True: try: line = q.get_nowait() except queue.Empty: await asyncio.sleep(0.02) continue if line is None: break if isinstance(line, Exception): print(f"[{client_id}] خطا در ارتباط با اسپیس آیا: {line}") await websocket.send_json({ "status": "success", "model_text": "یک لحظه حواسم پرت شد! می‌تونی پیامت رو دوباره بفرستی؟ 😊", "audio_url": None }) error_occurred = True break if isinstance(line, str) and line.startswith('data: '): try: msg_data = json.loads(line[6:]) msg_type = msg_data.get('msg') if msg_type == 'queue_full': await websocket.send_json({ "status": "success", "model_text": "سرم خیلی شلوغه! چند ثانیه دیگه دوباره امتحان کن! 😅", "audio_url": None }) error_occurred = True break if msg_type in ['process_generating', 'process_completed']: output = msg_data.get('output', {}).get('data', []) if output and len(output) > 0: payload_data = output[0] is_append = False if isinstance(payload_data, list) and len(payload_data) > 0: action = payload_data[0] if isinstance(action, list) and len(action) >= 3 and action[0] == "append": is_append = True delta = action[2] if delta: full_text += delta last_text_length = len(full_text) await websocket.send_json({ "status": "streaming", "text": delta, "audio": "" }) if not is_append: current_full_text = "" if isinstance(payload_data, list) and len(payload_data) > 0 and isinstance(payload_data[-1], list) and len(payload_data[-1]) > 1: current_full_text = payload_data[-1][1] or "" elif isinstance(payload_data, str): current_full_text = payload_data if current_full_text and len(current_full_text) > last_text_length: delta = current_full_text[last_text_length:] full_text += delta last_text_length = len(current_full_text) await websocket.send_json({ "status": "streaming", "text": delta, "audio": "" }) if msg_type == 'process_completed': break except Exception as e: pass if not error_occurred: final_clean_text = full_text.strip() print(f"[{client_id}] پاسخ استریم آلفا GPT5 کامل شد.") await websocket.send_json({ "status": "success", "model_text": final_clean_text, "audio_url": None }) else: # --- پردازش مدل صوتی جمینای --- max_retries = len(api_key_manager.keys) working_client = None for attempt in range(max_retries): key_index, api_key = await api_key_manager.get_next_key() try: client = genai.Client(http_options={"api_version": "v1beta"}, api_key=api_key) working_client = client break except Exception: continue if not working_client: await websocket.send_json({"status": "error", "message": "ارتباط با سرور هوش مصنوعی برقرار نشد."}) continue try: async with working_client.aio.live.connect(model=MODEL, config=CONFIG) as session: if data['type'] == 'text': prompt = data['content'] if text_history: import re # تفکیک سابقه متنی به پیام‌های مجزا و انتخاب حداکثر ۲۰ پیام آخر messages_list = re.split(r'(?=(?:کاربر|هوش مصنوعی):)', text_history) pruned_messages = [m.strip() for m in messages_list if m.strip()] pruned_history = "\n".join(pruned_messages[-20:]) prompt = f"تاریخچه مکالمه ما تا الان:\n{pruned_history}\n\nپیام جدید من:\n{data['content']}" await session.send(input=prompt, end_of_turn=True) elif data['type'] == 'audio': audio_bytes = base64.b64decode(data['content']) mime_type = "audio/pcm;rate=16000" if len(audio_bytes) > 0: await session.send(input={"data": audio_bytes, "mime_type": mime_type}, end_of_turn=True) else: await websocket.send_json({"status": "error", "message": "فایل صوتی دریافت شده خالی است."}) continue turn = session.receive() full_audio = bytearray() clean_full_text = "" # مقداردهی کلاس فیلتر جدید thought_filter = ThoughtFilter() def process_chunk(msg): t_chunk = "" a_chunk = bytearray() if hasattr(msg, 'server_content') and msg.server_content is not None: if hasattr(msg.server_content, 'output_transcription') and msg.server_content.output_transcription is not None: if hasattr(msg.server_content.output_transcription, 'text') and msg.server_content.output_transcription.text: t_chunk += msg.server_content.output_transcription.text if msg.server_content.model_turn is not None: for part in msg.server_content.model_turn.parts: if part.text: t_chunk += part.text if part.inline_data and part.inline_data.data: a_chunk.extend(part.inline_data.data) else: if getattr(msg, 'text', None): t_chunk += msg.text if getattr(msg, 'data', None): a_chunk.extend(msg.data) return t_chunk, a_chunk try: await websocket.send_json({"status": "start_message"}) first_response = await asyncio.wait_for(anext(turn), timeout=30.0) t, a = process_chunk(first_response) has_audio = bool(a and len(a) > 0) safe_t = thought_filter.process_chunk(t, has_audio) if safe_t or a: clean_full_text += safe_t full_audio.extend(a) await websocket.send_json({"status": "streaming", "text": safe_t, "audio": base64.b64encode(a).decode('utf-8') if a else ""}) async for response in turn: t, a = process_chunk(response) has_audio = bool(a and len(a) > 0) safe_t = thought_filter.process_chunk(t, has_audio) if safe_t or a: clean_full_text += safe_t full_audio.extend(a) await websocket.send_json({"status": "streaming", "text": safe_t, "audio": base64.b64encode(a).decode('utf-8') if a else ""}) except StopAsyncIteration: pass # در صورت پایان ارتباط، اگر چیزی در بافر فیلتر مانده بود آزاد شود final_leftover = thought_filter.flush() if final_leftover: clean_full_text += final_leftover await websocket.send_json({"status": "streaming", "text": final_leftover, "audio": ""}) audio_url = None if full_audio: filename = f"chat_{client_id}_{request_id}.wav" filepath = os.path.join(TEMP_DIR, filename) with wave.open(filepath, 'wb') as wf: wf.setnchannels(CHANNELS) wf.setsampwidth(SAMPLE_WIDTH) wf.setframerate(SAMPLE_RATE) wf.writeframes(full_audio) audio_url = f"/audio/{filename}" await websocket.send_json({ "status": "success", "model_text": clean_full_text.strip(), "audio_url": audio_url }) except Exception as e: print(f"[{client_id}] خطا: {e}") await websocket.send_json({"status": "error", "message": f"خطا در پردازش: {str(e)}"}) except WebSocketDisconnect: print(f"کلاینت [{client_id}] اتصال را قطع کرد.") except Exception: pass