# --- START OF FILE app.py --- import os import json import logging import threading import base64 import io import time from flask import Flask, render_template, request, Response, stream_with_context import requests import docx # ================== بخش تنظیمات لاگ‌نویسی ================== class NoGrpcFilter(logging.Filter): def filter(self, record): return not record.getMessage().startswith('ALTS creds ignored.') def setup_logging(): log_format = '[%(asctime)s] [%(levelname)s]: %(message)s' date_format = '%Y-%m-%d %H:%M:%S' formatter = logging.Formatter(log_format, datefmt=date_format) root_logger = logging.getLogger() if root_logger.hasHandlers(): root_logger.handlers.clear() console_handler = logging.StreamHandler() console_handler.setFormatter(formatter) console_handler.addFilter(NoGrpcFilter()) root_logger.addHandler(console_handler) root_logger.setLevel(logging.INFO) setup_logging() app = Flask(__name__) # ================== بخش پیکربندی Gemini ================== GEMINI_MODEL_NAME = "gemini-2.5-flash" ALL_KEYS_STR = os.environ.get("ALL_GEMINI_API_KEYS", "") GEMINI_API_KEYS = [key.strip() for key in ALL_KEYS_STR.split(',') if key.strip()] if not GEMINI_API_KEYS: logging.critical("هشدار: هیچ کلید API برای Gemini تنظیم نشده است!") key_index_counter = 0 key_lock = threading.Lock() def get_next_key_with_index(): global key_index_counter with key_lock: if not GEMINI_API_KEYS: raise ValueError("لیست کلیدهای API خالی است.") current_index = key_index_counter key = GEMINI_API_KEYS[current_index] key_index_counter = (key_index_counter + 1) % len(GEMINI_API_KEYS) return key, current_index # تنظیمات زمانی حیاتی # اتصال اولیه: سریع قطع کن اگر وصل نشد (5 ثانیه) # خواندن دیتا: صبر زیاد برای پردازش فایل‌ها (120 ثانیه) STREAM_CONNECT_TIMEOUT = 5 STREAM_READ_TIMEOUT = 120 # ================== پایان پیکربندی ==================== @app.route('/') def index(): return render_template('index.html') @app.route('/chat', methods=['POST']) def chat(): if not GEMINI_API_KEYS: # اگر هیچ کلیدی کلا وجود نداشت، چاره‌ای جز خطا نیست error_payload = {"type": "error", "message": "خطای تنظیمات سرور: کلید API یافت نشد."} return Response(f"data: {json.dumps(error_payload)}\n\n", status=500, mimetype='text/event-stream') data = request.json system_instruction = "تو چت بات هوش مصنوعی آلفا هستی و توسط برنامه هوش مصنوعی آلفا توسعه داده شدی. کمی با کاربران باحال و دوستانه صحبت کن و از ایموجی‌ها استفاده کن. همیشه پاسخ‌هایت را به زبان فارسی و یا هر زبانی که کاربر صحبت میکنه ارائه بده." show_thoughts = data.get("show_thoughts", False) # --- بخش پردازش پیام‌ها و فایل (بدون تغییر) --- gemini_messages = [] for msg in data.get("messages", []): role = "model" if msg.get("role") == "assistant" else msg.get("role") processed_parts = [] for part in msg.get("parts", []): if part.get("text"): processed_parts.append({"text": part["text"]}) if part.get("base64Data") and part.get("mimeType"): mime_type = part["mimeType"] if mime_type == "application/vnd.openxmlformats-officedocument.wordprocessingml.document": try: decoded_data = base64.b64decode(part["base64Data"]) file_stream = io.BytesIO(decoded_data) document = docx.Document(file_stream) full_text = "\n".join([para.text for para in document.paragraphs]) final_text_part = f"کاربر یک فایل Word آپلود کرد. محتوای متنی آن:\n\n---\n\n{full_text}\n\n---" processed_parts.append({"text": final_text_part}) except Exception: processed_parts.append({"text": "[خطا در خواندن فایل Word]"}) else: processed_parts.append({"inline_data": {"mime_type": part["mimeType"], "data": part["base64Data"]}}) if processed_parts: if gemini_messages and gemini_messages[-1]["role"] == role: gemini_messages[-1]["parts"].extend(processed_parts) else: gemini_messages.append({"role": role, "parts": processed_parts}) if not any(msg['role'] == 'user' for msg in gemini_messages): return Response("data: [DONE]\n\n", mimetype='text/event-stream') @stream_with_context def stream_response(): # تعداد تلاش‌ها برابر با تعداد کلیدهاست (یک دور کامل روی همه کلیدها) max_attempts = len(GEMINI_API_KEYS) # اگر تعداد کلیدها کم بود، حداقل 3 بار تلاش کن (با تکرار کلیدها) if max_attempts < 3: max_attempts = 3 for attempt in range(max_attempts): try: # انتخاب کلید api_key, key_index = get_next_key_with_index() # ساخت درخواست api_endpoint = f"https://generativelanguage.googleapis.com/v1beta/models/{GEMINI_MODEL_NAME}:streamGenerateContent?key={api_key}&alt=sse" payload = { "contents": gemini_messages, "systemInstruction": {"parts": [{"text": system_instruction}]}, "tools": [{"google_search": {}}], "generationConfig": { "temperature": 0.7, } } if show_thoughts: payload["generationConfig"]["thinking_config"] = {"include_thoughts": True} logging.info(f"تلاش {attempt+1}: استفاده از کلید {key_index + 1}...") # ارسال درخواست به گوگل # stream=True یعنی پاسخ را تکه تکه بگیر # timeout=(Connect, Read) response = requests.post( api_endpoint, json=payload, stream=True, timeout=(STREAM_CONNECT_TIMEOUT, STREAM_READ_TIMEOUT) ) # اگر وضعیت 200 نبود، یعنی این کلید مشکل دارد. # Exception ایجاد میکنیم تا برود به بخش except و کلید بعدی را تست کند if response.status_code != 200: logging.warning(f"کلید {key_index + 1} خطا داد: {response.status_code}") response.close() continue # برو به کلید بعدی # ترفند اصلی: ساخت Iterator # ما سعی میکنیم "اولین خط" پاسخ را بگیریم. # اگر اینجا خطا بدهد یعنی هنوز چیزی به کاربر نفرستادیم، پس میتونیم سوییچ کنیم. line_iterator = response.iter_lines() # اینجا با yield from ما عملاً استریم را به کلاینت وصل میکنیم # اگر وسط استریم قطع شود کاری نمیتوان کرد، اما مهم شروعش است. data_received = False for line in line_iterator: if line: decoded_line = line.decode('utf-8') if decoded_line.startswith('data: '): try: chunk_data = json.loads(decoded_line[6:]) parts = chunk_data.get("candidates", [{}])[0].get("content", {}).get("parts", []) for part in parts: if "text" not in part or not part["text"]: continue # به محض اینکه اولین داده سالم رسید، یعنی اتصال موفق بوده data_received = True is_thought = part.get("thought") is True if show_thoughts and is_thought: yield f"data: {json.dumps({'type': 'thought', 'content': part['text']})}\n\n" elif not is_thought: yield f"data: {json.dumps({'choices': [{'delta': {'content': part['text']}}]})}\n\n" except Exception: continue # اگر حلقه تمام شد و دیتایی ارسال شد، کار تمام است if data_received: logging.info(f"پاسخ با موفقیت با کلید {key_index + 1} تکمیل شد.") return # اگر ریسپانس 200 بود ولی دیتایی نداشت (خیلی بعید)، باز هم یعنی موفق بوده # اما اگر خالی بودنش به خاطر خطا بود، شاید بهتر باشد ادامه دهیم. # اینجا فرض را بر اتمام موفق میگذاریم. return except Exception as e: # هر خطایی رخ داد (تایم اوت، شبکه، قطعی، فیلتر) # لاگ کن و برو دور بعدی حلقه (کلید بعدی) logging.error(f"خطا در کلید {key_index + 1}: {e} -- تلاش مجدد با کلید دیگر...") time.sleep(0.5) # مکث کوتاه برای جلوگیری از اسپم سریع continue # === اگر از حلقه خارج شدیم یعنی همه کلیدها تست شدند و هیچکدام کار نکردند === # فقط در این حالت نهایی مجبوریم یک پیام به کاربر بدهیم که بفهمد تمام شده # اما سعی میکنیم پیام سیستمی نباشد. # یا میتوانیم یک پیام [DONE] بفرستیم که انگار تمام شده (بدون خطا) logging.critical("تمام کلیدها شکست خوردند.") # اینجا یک پیام خطای نرم میفرستیم که کاربر فکر نکند سرور خراب است final_err = {"type": "error", "message": "شبکه شلوغ است. لطفا مجددا دکمه ارسال را بزنید."} yield f"data: {json.dumps(final_err)}\n\n" return Response(stream_response(), mimetype='text/event-stream') if __name__ == '__main__': app.run(debug=True, host='0.0.0.0', port=os.environ.get("PORT", 7860)) # --- END OF FILE app.py ---