# --- START OF FILE app.py --- import os import json import logging import threading import base64 import io import time from flask import Flask, render_template, request, Response import requests import docx # ================== بخش تنظیمات لاگ‌نویسی ================== class NoGrpcFilter(logging.Filter): def filter(self, record): return not record.getMessage().startswith('ALTS creds ignored.') def setup_logging(): log_format = '[%(asctime)s] [%(levelname)s]: %(message)s' date_format = '%Y-%m-%d %H:%M:%S' formatter = logging.Formatter(log_format, datefmt=date_format) root_logger = logging.getLogger() if root_logger.hasHandlers(): root_logger.handlers.clear() console_handler = logging.StreamHandler() console_handler.setFormatter(formatter) console_handler.addFilter(NoGrpcFilter()) root_logger.addHandler(console_handler) root_logger.setLevel(logging.INFO) setup_logging() app = Flask(__name__) # ================== بخش پیکربندی Gemini ================== # مدل 2.5 فلش بهترین مدل برای پردازش ویدیو است GEMINI_MODEL_NAME = "gemini-2.5-flash" ALL_KEYS_STR = os.environ.get("ALL_GEMINI_API_KEYS", "") GEMINI_API_KEYS = [key.strip() for key in ALL_KEYS_STR.split(',') if key.strip()] if not GEMINI_API_KEYS: logging.critical("هشدار: هیچ کلید API برای Gemini در Secrets تنظیم نشده است! (ALL_GEMINI_API_KEYS)") key_index_counter = 0 key_lock = threading.Lock() def get_next_key_with_index(): global key_index_counter with key_lock: if not GEMINI_API_KEYS: raise ValueError("لیست کلیدهای API خالی است.") current_index = key_index_counter key = GEMINI_API_KEYS[current_index] key_index_counter = (key_index_counter + 1) % len(GEMINI_API_KEYS) return key, current_index # افزایش تایم‌اوت برای آپلود ویدیو ضروری است STREAM_CONNECT_TIMEOUT = 20 STREAM_READ_TIMEOUT = 120 # ================== پایان بخش پیکربندی ==================== @app.route('/') def index(): return render_template('index.html') @app.route('/chat', methods=['POST']) def chat(): if not GEMINI_API_KEYS: error_payload = {"type": "error", "message": "خطای سرور: هیچ کلید API پیکربندی نشده است."} return Response(f"data: {json.dumps(error_payload)}\n\n", status=500, mimetype='text/event-stream') data = request.json system_instruction = "تو چت بات هوش مصنوعی آلفا هستی. تو توانایی تحلیل دقیق ویدیو، صدا و تصویر و انواع فایل هارا را داری. اگر کاربر ویدیویی فرستاد، تمام فریم‌ها و صدای آن را بررسی کن و به سوالات پاسخ بده و یا بقیه فایل ها. همیشه پاسخ‌هایت را به زبان فارسی و یا هر زبانی که کاربر صحبت میکنه ارائه بده.اگر کسی از تو پرسید که توسط کی ساخته شدی تو میگی من توسط هوش مصنوعی آلفا توسعه داده شدم فقط زمانی که پرسید. در احوال پرسی از شکلک های متفاوت مناسب استفاده کن ." show_thoughts = data.get("show_thoughts", False) # بخش پردازش پیام‌ها و فایل DOCX و ویدیو gemini_messages = [] for msg in data.get("messages", []): role = "model" if msg.get("role") == "assistant" else msg.get("role") processed_parts = [] for part in msg.get("parts", []): if part.get("text"): processed_parts.append({"text": part["text"]}) if part.get("base64Data") and part.get("mimeType"): mime_type = part["mimeType"] if mime_type == "application/vnd.openxmlformats-officedocument.wordprocessingml.document": try: decoded_data = base64.b64decode(part["base64Data"]) file_stream = io.BytesIO(decoded_data) document = docx.Document(file_stream) full_text = "\n".join([para.text for para in document.paragraphs]) final_text_part = f"کاربر یک فایل Word آپلود کرد. محتوای متنی آن به شرح زیر است:\n\n---\n\n{full_text}\n\n---" processed_parts.append({"text": final_text_part}) logging.info("فایل DOCX با موفقیت پردازش و متن آن استخراج شد.") except Exception as e: logging.error(f"خطا در پردازش فایل DOCX: {e}") processed_parts.append({"text": "[خطا: امکان پردازش فایل Word وجود نداشت.]"}) else: # *** تغییر اصلی برای ویدیو اینجاست *** # اگر فایل ویدیویی بود، یک راهنمایی متنی اضافه می‌کنیم تا مدل گیج نشود if mime_type.startswith("video/"): processed_parts.append({"text": "این فایل یک ویدیو است. لطفا فریم‌ها، حرکات و صدای داخل ویدیو را با دقت تحلیل کن."}) processed_parts.append({"inline_data": {"mime_type": part["mimeType"], "data": part["base64Data"]}}) if processed_parts: if gemini_messages and gemini_messages[-1]["role"] == role: gemini_messages[-1]["parts"].extend(processed_parts) else: gemini_messages.append({"role": role, "parts": processed_parts}) if not any(msg['role'] == 'user' for msg in gemini_messages): return Response("data: [DONE]\n\n", mimetype='text/event-stream') def stream_response(): # چرخش بین کلیدها در صورت بروز خطا max_attempts = len(GEMINI_API_KEYS) * 2 attempts = 0 last_error = None while attempts < max_attempts: attempts += 1 try: api_key, key_index = get_next_key_with_index() api_endpoint = f"https://generativelanguage.googleapis.com/v1beta/models/{GEMINI_MODEL_NAME}:streamGenerateContent?key={api_key}&alt=sse" payload = { "contents": gemini_messages, "systemInstruction": {"parts": [{"text": system_instruction}]}, "tools": [{"google_search": {}}], "generationConfig": { "temperature": 0.7, } } if show_thoughts: payload["generationConfig"]["thinking_config"] = { "include_thoughts": True } # درخواست به گوگل با تایم‌اوت کنترل شده with requests.post(api_endpoint, json=payload, stream=True, timeout=(STREAM_CONNECT_TIMEOUT, STREAM_READ_TIMEOUT)) as response: # مدیریت خطاهای خاص گوگل (429, 500, 403) برای سوئیچ روی کلید بعدی if response.status_code == 429 or response.status_code >= 500: logging.warning(f"خطای {response.status_code} با کلید {key_index}. رفتن به کلید بعدی...") last_error = f"Status Code: {response.status_code}" continue if response.status_code == 403: logging.warning(f"کلید {key_index} نامعتبر است (403). رفتن به کلید بعدی...") last_error = "Invalid API Key (403)" continue response.raise_for_status() data_received = False for line in response.iter_lines(): if line: decoded_line = line.decode('utf-8') if decoded_line.startswith('data: '): try: chunk_data = json.loads(decoded_line[6:]) parts = chunk_data.get("candidates", [{}])[0].get("content", {}).get("parts", []) for part in parts: if "text" not in part or not part["text"]: continue data_received = True is_a_thought = part.get("thought") is True if show_thoughts and is_a_thought: thought_payload = {"type": "thought", "content": part["text"]} yield f"data: {json.dumps(thought_payload)}\n\n" elif not is_a_thought: sse_payload = {"choices": [{"delta": {"content": part["text"]}}]} yield f"data: {json.dumps(sse_payload)}\n\n" except (json.JSONDecodeError, IndexError, KeyError): continue if data_received: return else: last_error = "Empty response" continue except (requests.exceptions.Timeout, requests.exceptions.ConnectionError, requests.exceptions.ChunkedEncodingError) as e: logging.warning(f"خطای شبکه با کلید {key_index}: {str(e)}. تلاش مجدد...") last_error = str(e) continue except Exception as e: logging.error(f"خطای غیرمنتظره با کلید {key_index}: {e}") last_error = str(e) continue # اگر هیچ کلیدی کار نکرد، پیام خطای ملایم به کاربر بده (نه ارور قرمز) error_message = "سیستم در حال پردازش سنگین است. لطفا مجددا تلاش کنید." error_payload = {"choices": [{"delta": {"content": error_message}}]} yield f"data: {json.dumps(error_payload)}\n\n" return Response(stream_response(), mimetype='text/event-stream') if __name__ == '__main__': app.run(debug=True, host='0.0.0.0', port=os.environ.get("PORT", 7860)) # --- END OF FILE app.py ---