Chat / app.py
Jan2000's picture
Update app.py
b53a67a unverified
raw
history blame
11.7 kB
# --- START OF FILE app.py ---
import os
import json
import logging
import threading
import base64
import io
import time
from flask import Flask, render_template, request, Response, stream_with_context
import requests
import docx
# ================== بخش تنظیمات لاگ‌نویسی ==================
class NoGrpcFilter(logging.Filter):
def filter(self, record):
return not record.getMessage().startswith('ALTS creds ignored.')
def setup_logging():
log_format = '[%(asctime)s] [%(levelname)s]: %(message)s'
date_format = '%Y-%m-%d %H:%M:%S'
formatter = logging.Formatter(log_format, datefmt=date_format)
root_logger = logging.getLogger()
if root_logger.hasHandlers():
root_logger.handlers.clear()
console_handler = logging.StreamHandler()
console_handler.setFormatter(formatter)
console_handler.addFilter(NoGrpcFilter())
root_logger.addHandler(console_handler)
root_logger.setLevel(logging.INFO)
setup_logging()
app = Flask(__name__)
# ================== بخش پیکربندی Gemini ==================
GEMINI_MODEL_NAME = "gemini-2.5-flash"
ALL_KEYS_STR = os.environ.get("ALL_GEMINI_API_KEYS", "")
GEMINI_API_KEYS = [key.strip() for key in ALL_KEYS_STR.split(',') if key.strip()]
if not GEMINI_API_KEYS:
logging.critical("هشدار: هیچ کلید API برای Gemini تنظیم نشده است!")
key_index_counter = 0
key_lock = threading.Lock()
def get_next_key_with_index():
global key_index_counter
with key_lock:
if not GEMINI_API_KEYS:
raise ValueError("لیست کلیدهای API خالی است.")
current_index = key_index_counter
key = GEMINI_API_KEYS[current_index]
key_index_counter = (key_index_counter + 1) % len(GEMINI_API_KEYS)
return key, current_index
# تنظیمات زمانی حیاتی
# اتصال اولیه: سریع قطع کن اگر وصل نشد (5 ثانیه)
# خواندن دیتا: صبر زیاد برای پردازش فایل‌ها (120 ثانیه)
STREAM_CONNECT_TIMEOUT = 5
STREAM_READ_TIMEOUT = 120
# ================== پایان پیکربندی ====================
@app.route('/')
def index():
return render_template('index.html')
@app.route('/chat', methods=['POST'])
def chat():
if not GEMINI_API_KEYS:
# اگر هیچ کلیدی کلا وجود نداشت، چاره‌ای جز خطا نیست
error_payload = {"type": "error", "message": "خطای تنظیمات سرور: کلید API یافت نشد."}
return Response(f"data: {json.dumps(error_payload)}\n\n", status=500, mimetype='text/event-stream')
data = request.json
system_instruction = "تو چت بات هوش مصنوعی آلفا هستی و توسط برنامه هوش مصنوعی آلفا توسعه داده شدی. کمی با کاربران باحال و دوستانه صحبت کن و از ایموجی‌ها استفاده کن. همیشه پاسخ‌هایت را به زبان فارسی و یا هر زبانی که کاربر صحبت میکنه ارائه بده."
show_thoughts = data.get("show_thoughts", False)
# --- بخش پردازش پیام‌ها و فایل (بدون تغییر) ---
gemini_messages = []
for msg in data.get("messages", []):
role = "model" if msg.get("role") == "assistant" else msg.get("role")
processed_parts = []
for part in msg.get("parts", []):
if part.get("text"):
processed_parts.append({"text": part["text"]})
if part.get("base64Data") and part.get("mimeType"):
mime_type = part["mimeType"]
if mime_type == "application/vnd.openxmlformats-officedocument.wordprocessingml.document":
try:
decoded_data = base64.b64decode(part["base64Data"])
file_stream = io.BytesIO(decoded_data)
document = docx.Document(file_stream)
full_text = "\n".join([para.text for para in document.paragraphs])
final_text_part = f"کاربر یک فایل Word آپلود کرد. محتوای متنی آن:\n\n---\n\n{full_text}\n\n---"
processed_parts.append({"text": final_text_part})
except Exception:
processed_parts.append({"text": "[خطا در خواندن فایل Word]"})
else:
processed_parts.append({"inline_data": {"mime_type": part["mimeType"], "data": part["base64Data"]}})
if processed_parts:
if gemini_messages and gemini_messages[-1]["role"] == role:
gemini_messages[-1]["parts"].extend(processed_parts)
else:
gemini_messages.append({"role": role, "parts": processed_parts})
if not any(msg['role'] == 'user' for msg in gemini_messages):
return Response("data: [DONE]\n\n", mimetype='text/event-stream')
@stream_with_context
def stream_response():
# تعداد تلاش‌ها برابر با تعداد کلیدهاست (یک دور کامل روی همه کلیدها)
max_attempts = len(GEMINI_API_KEYS)
# اگر تعداد کلیدها کم بود، حداقل 3 بار تلاش کن (با تکرار کلیدها)
if max_attempts < 3:
max_attempts = 3
for attempt in range(max_attempts):
try:
# انتخاب کلید
api_key, key_index = get_next_key_with_index()
# ساخت درخواست
api_endpoint = f"https://generativelanguage.googleapis.com/v1beta/models/{GEMINI_MODEL_NAME}:streamGenerateContent?key={api_key}&alt=sse"
payload = {
"contents": gemini_messages,
"systemInstruction": {"parts": [{"text": system_instruction}]},
"tools": [{"google_search": {}}],
"generationConfig": {
"temperature": 0.7,
}
}
if show_thoughts:
payload["generationConfig"]["thinking_config"] = {"include_thoughts": True}
logging.info(f"تلاش {attempt+1}: استفاده از کلید {key_index + 1}...")
# ارسال درخواست به گوگل
# stream=True یعنی پاسخ را تکه تکه بگیر
# timeout=(Connect, Read)
response = requests.post(
api_endpoint,
json=payload,
stream=True,
timeout=(STREAM_CONNECT_TIMEOUT, STREAM_READ_TIMEOUT)
)
# اگر وضعیت 200 نبود، یعنی این کلید مشکل دارد.
# Exception ایجاد میکنیم تا برود به بخش except و کلید بعدی را تست کند
if response.status_code != 200:
logging.warning(f"کلید {key_index + 1} خطا داد: {response.status_code}")
response.close()
continue # برو به کلید بعدی
# ترفند اصلی: ساخت Iterator
# ما سعی میکنیم "اولین خط" پاسخ را بگیریم.
# اگر اینجا خطا بدهد یعنی هنوز چیزی به کاربر نفرستادیم، پس میتونیم سوییچ کنیم.
line_iterator = response.iter_lines()
# اینجا با yield from ما عملاً استریم را به کلاینت وصل میکنیم
# اگر وسط استریم قطع شود کاری نمیتوان کرد، اما مهم شروعش است.
data_received = False
for line in line_iterator:
if line:
decoded_line = line.decode('utf-8')
if decoded_line.startswith('data: '):
try:
chunk_data = json.loads(decoded_line[6:])
parts = chunk_data.get("candidates", [{}])[0].get("content", {}).get("parts", [])
for part in parts:
if "text" not in part or not part["text"]:
continue
# به محض اینکه اولین داده سالم رسید، یعنی اتصال موفق بوده
data_received = True
is_thought = part.get("thought") is True
if show_thoughts and is_thought:
yield f"data: {json.dumps({'type': 'thought', 'content': part['text']})}\n\n"
elif not is_thought:
yield f"data: {json.dumps({'choices': [{'delta': {'content': part['text']}}]})}\n\n"
except Exception:
continue
# اگر حلقه تمام شد و دیتایی ارسال شد، کار تمام است
if data_received:
logging.info(f"پاسخ با موفقیت با کلید {key_index + 1} تکمیل شد.")
return
# اگر ریسپانس 200 بود ولی دیتایی نداشت (خیلی بعید)، باز هم یعنی موفق بوده
# اما اگر خالی بودنش به خاطر خطا بود، شاید بهتر باشد ادامه دهیم.
# اینجا فرض را بر اتمام موفق میگذاریم.
return
except Exception as e:
# هر خطایی رخ داد (تایم اوت، شبکه، قطعی، فیلتر)
# لاگ کن و برو دور بعدی حلقه (کلید بعدی)
logging.error(f"خطا در کلید {key_index + 1}: {e} -- تلاش مجدد با کلید دیگر...")
time.sleep(0.5) # مکث کوتاه برای جلوگیری از اسپم سریع
continue
# === اگر از حلقه خارج شدیم یعنی همه کلیدها تست شدند و هیچکدام کار نکردند ===
# فقط در این حالت نهایی مجبوریم یک پیام به کاربر بدهیم که بفهمد تمام شده
# اما سعی میکنیم پیام سیستمی نباشد.
# یا میتوانیم یک پیام [DONE] بفرستیم که انگار تمام شده (بدون خطا)
logging.critical("تمام کلیدها شکست خوردند.")
# اینجا یک پیام خطای نرم میفرستیم که کاربر فکر نکند سرور خراب است
final_err = {"type": "error", "message": "شبکه شلوغ است. لطفا مجددا دکمه ارسال را بزنید."}
yield f"data: {json.dumps(final_err)}\n\n"
return Response(stream_response(), mimetype='text/event-stream')
if __name__ == '__main__':
app.run(debug=True, host='0.0.0.0', port=os.environ.get("PORT", 7860))
# --- END OF FILE app.py ---