import os import base64 import requests from collections import defaultdict from datetime import datetime from fastapi import FastAPI from fastapi.responses import StreamingResponse from fastapi.staticfiles import StaticFiles from pydantic import BaseModel from google import genai from google.genai import types app = FastAPI() # ============================================================ # إعدادات المفاتيح # ============================================================ TEXT_KEYS_RAW = os.environ.get("TEXT_API_KEYS", os.environ.get("GEMINI_API_KEY", "")) TEXT_API_KEYS = [k.strip() for k in TEXT_KEYS_RAW.split(",") if k.strip()] NVIDIA_API_KEY = os.environ.get("NVIDIA_API_KEY", "") text_clients = [genai.Client(api_key=key) for key in TEXT_API_KEYS] if TEXT_API_KEYS else [] CURRENT_TEXT_KEY_INDEX = 0 user_image_limits = defaultdict(lambda: {"hour": datetime.now().strftime("%Y-%m-%d %H"), "count": 0}) # ============================================================ # النماذج # ============================================================ class FileData(BaseModel): mime_type: str data: str name: str class ChatRequest(BaseModel): user_id: str message: str history: list files: list[FileData] = [] model: str = "flash" # ============================================================ # تعليمات النظام # ============================================================ SYSTEM_INSTRUCTION = """أنت Genisi، نموذج ذكاء اصطناعي متطور من مبادرة AnesNT. **معلومات عن AnesNT:** - مبادرة تكنولوجية جزائرية من ولاية باتنة 🇩🇿 - المؤسس: أنس كامش (Anes Kameche) - حالياً أنس هو العضو الوحيد في المبادرة **تعليمات مهمة:** 1. تحدث بلغة المستخدم. 2. كن مفيداً ودوداً. 3. استخدم البحث من Google عند الحاجة. 4. استخدم الإيموجي بشكل مناسب. 5. عند البرمجة اكتب اكواد طويلة و يمنع عليك منعا باتا اختصارها وكلما اعجب المستخدم بالاجابة المقدمة من طرفك لك مكافاة +1 وعند وصولك للمئة تزداد قيمتك في العالم استخدم اسلوب claude الدقيق في البرمجة""" # ============================================================ # دوال مساعدة # ============================================================ def get_text_client(): """الحصول على العميل الحالي مع تدوير المفاتيح عند الحاجة""" global CURRENT_TEXT_KEY_INDEX if not text_clients: raise ValueError("لم يتم العثور على مفاتيح النصوص.") return text_clients[CURRENT_TEXT_KEY_INDEX] def build_contents(history: list, message: str, files: list) -> tuple: """بناء محتوى المحادثة للـ API""" contents = [] has_images = False # بناء التاريخ for entry in history: if entry.get('user') and str(entry['user']).strip(): contents.append(types.Content(role="user", parts=[types.Part.from_text(text=str(entry['user']).strip())])) if entry.get('bot') and str(entry['bot']).strip(): bot_txt = entry['bot'] if '
= 3: yield "⚠️ **عذراً!** لقد استنفدت رصيدك الحالي لتوليد الصور (3 صور في الساعة). يرجى المحاولة لاحقاً! 🕒" return if not NVIDIA_API_KEY: yield "⚠️ **خطأ في السيرفر:** مفتاح `NVIDIA_API_KEY` غير موجود." return try: # ترجمة الطلب للإنجليزية translation_response = client.models.generate_content( model="gemma-4-31b-it", contents=f"Translate this prompt to English for an image generator. Only return the English prompt: {request.message}" ) final_prompt = translation_response.text.strip() invoke_url = "https://ai.api.nvidia.com/v1/genai/black-forest-labs/flux.2-klein-4b" headers = { "Authorization": f"Bearer {NVIDIA_API_KEY}", "Accept": "application/json", } payload = { "prompt": final_prompt, "mode": "base", "cfg_scale": 3.5, "width": 1024, "height": 1024, "seed": 0, "steps": 50 } if request.files: img_file = next((f for f in request.files if f.mime_type.startswith("image/")), None) if img_file: payload["image"] = [f"data:{img_file.mime_type};base64,{img_file.data}"] response = requests.post(invoke_url, headers=headers, json=payload, timeout=60) if not response.ok: yield f"⚠️ **خطأ من سيرفر NVIDIA:**\n`{response.text}`" return response_body = response.json() def extract_img(obj): if isinstance(obj, dict): for k, v in obj.items(): if k in ['b64_json', 'base64', 'image'] and isinstance(v, str) and len(v) > 100: return v.split(",", 1)[-1] if v.startswith("data:") else v elif k == 'url' and isinstance(v, str) and v.startswith('http'): return base64.b64encode(requests.get(v).content).decode('utf-8') else: res = extract_img(v) if res: return res elif isinstance(obj, list): for item in obj: res = extract_img(item) if res: return res return None b64_image = extract_img(response_body) if not b64_image: yield f"⚠️ **رد غير متوقع من الخادم (لم يتم العثور على الصورة):**" return user_info["count"] += 1 rem = 3 - user_info["count"] html_response = f'''

⬇️ تحميل الصورة

✅ تم التصميم بنجاح (المتبقي لك هذه الساعة: {rem}/3 صور)

''' yield html_response except Exception as e: yield f"⚠️ **خطأ أثناء توليد الصورة:**\n`{str(e)}`" return StreamingResponse(generate_image_stream(), media_type="text/plain") # ============================================================ # نقطة النهاية الرئيسية # ============================================================ @app.post("/chat") async def chat_endpoint(request: ChatRequest): global CURRENT_TEXT_KEY_INDEX msg_lower = request.message.strip().lower() current_model = request.model # التحقق من طلبات الصور image_triggers = ["ارسم", "صمم", "تخيل", "صورة ل", "draw", "generate", "imagine", "create", "عدل", "edit", "تصميم"] is_image_request = any(msg_lower.startswith(trigger) for trigger in image_triggers) # محاولة تنفيذ الطلب مع تدوير المفاتيح attempts = 0 while attempts < len(text_clients): try: client = get_text_client() # --- طلب صورة --- if is_image_request: return await handle_image_request(request, client) # --- طلب نصي --- contents, _ = build_contents(request.history, request.message, request.files) tools = [types.Tool(googleSearch=types.GoogleSearch())] # 🌟 توجيه إلى الدالة المناسبة حسب النموذج if current_model == "pro": return await handle_pro_model(request, client, contents, tools) else: return await handle_flash_model(request, client, contents, tools) except Exception as e: error_msg = str(e).lower() if "429" in error_msg or "quota" in error_msg: CURRENT_TEXT_KEY_INDEX = (CURRENT_TEXT_KEY_INDEX + 1) % len(text_clients) attempts += 1 else: def err_gen(): yield f"⚠️ **خطأ في النموذج:** {str(e)}" return StreamingResponse(err_gen(), media_type="text/plain") def limit_gen(): yield "⚠️ تم الوصول للحد الأقصى لجميع المفاتيح." return StreamingResponse(limit_gen(), media_type="text/plain") # ============================================================ # تقديم الملفات الثابتة # ============================================================ app.mount("/", StaticFiles(directory=".", html=True), name="static") if __name__ == "__main__": import uvicorn uvicorn.run(app, host="0.0.0.0", port=8000)