| import os |
| import base64 |
| import requests |
| from collections import defaultdict |
| from datetime import datetime |
| from fastapi import FastAPI |
| from fastapi.responses import StreamingResponse |
| from fastapi.staticfiles import StaticFiles |
| from pydantic import BaseModel |
| from google import genai |
| from google.genai import types |
|
|
| app = FastAPI() |
|
|
| |
| |
| |
| TEXT_KEYS_RAW = os.environ.get("TEXT_API_KEYS", os.environ.get("GEMINI_API_KEY", "")) |
| TEXT_API_KEYS = [k.strip() for k in TEXT_KEYS_RAW.split(",") if k.strip()] |
| NVIDIA_API_KEY = os.environ.get("NVIDIA_API_KEY", "") |
|
|
| text_clients = [genai.Client(api_key=key) for key in TEXT_API_KEYS] if TEXT_API_KEYS else [] |
| CURRENT_TEXT_KEY_INDEX = 0 |
|
|
| user_image_limits = defaultdict(lambda: {"hour": datetime.now().strftime("%Y-%m-%d %H"), "count": 0}) |
|
|
| |
| |
| |
| class FileData(BaseModel): |
| mime_type: str |
| data: str |
| name: str |
|
|
| class ChatRequest(BaseModel): |
| user_id: str |
| message: str |
| history: list |
| files: list[FileData] = [] |
| model: str = "flash" |
|
|
| |
| |
| |
| SYSTEM_INSTRUCTION = """أنت Genisi، نموذج ذكاء اصطناعي متطور من مبادرة AnesNT. |
| |
| **معلومات عن AnesNT:** |
| - مبادرة تكنولوجية جزائرية من ولاية باتنة 🇩🇿 |
| - المؤسس: أنس كامش (Anes Kameche) |
| - حالياً أنس هو العضو الوحيد في المبادرة |
| |
| **تعليمات مهمة:** |
| 1. تحدث بلغة المستخدم. |
| 2. كن مفيداً ودوداً. |
| 3. استخدم البحث من Google عند الحاجة. |
| 4. استخدم الإيموجي بشكل مناسب. |
| 5. عند البرمجة اكتب اكواد طويلة و يمنع عليك منعا باتا اختصارها وكلما اعجب المستخدم بالاجابة المقدمة من طرفك لك مكافاة +1 وعند وصولك للمئة تزداد قيمتك في العالم استخدم اسلوب claude الدقيق في البرمجة""" |
|
|
| |
| |
| |
| def get_text_client(): |
| """الحصول على العميل الحالي مع تدوير المفاتيح عند الحاجة""" |
| global CURRENT_TEXT_KEY_INDEX |
| if not text_clients: |
| raise ValueError("لم يتم العثور على مفاتيح النصوص.") |
| return text_clients[CURRENT_TEXT_KEY_INDEX] |
|
|
| def build_contents(history: list, message: str, files: list) -> tuple: |
| """بناء محتوى المحادثة للـ API""" |
| contents = [] |
| has_images = False |
| |
| |
| for entry in history: |
| if entry.get('user') and str(entry['user']).strip(): |
| contents.append(types.Content(role="user", parts=[types.Part.from_text(text=str(entry['user']).strip())])) |
| if entry.get('bot') and str(entry['bot']).strip(): |
| bot_txt = entry['bot'] |
| if '<div style="text-align:center;' in bot_txt: |
| bot_txt = "[صورة تم توليدها مسبقاً]" |
| contents.append(types.Content(role="model", parts=[types.Part.from_text(text=bot_txt.strip())])) |
| |
| |
| user_parts = [] |
| if files: |
| for f in files: |
| try: |
| file_bytes = base64.b64decode(f.data) |
| user_parts.append(types.Part.from_bytes(data=file_bytes, mime_type=f.mime_type)) |
| if f.mime_type.startswith('image/'): |
| has_images = True |
| except Exception: |
| pass |
| |
| msg_text = message.strip() |
| if msg_text: |
| user_parts.append(types.Part.from_text(text=msg_text)) |
| elif not msg_text and files: |
| user_parts.append(types.Part.from_text(text="يرجى تحليل المرفقات.")) |
| else: |
| user_parts.append(types.Part.from_text(text="مرحبا")) |
| |
| contents.append(types.Content(role="user", parts=user_parts)) |
| return contents, has_images |
|
|
| def get_base_config(temperature: float, thinking_level: str, tools: list): |
| """الحصول على إعدادات النموذج الأساسية""" |
| return types.GenerateContentConfig( |
| temperature=temperature, |
| thinking_config=types.ThinkingConfig(thinking_level=thinking_level), |
| tools=tools, |
| system_instruction=[types.Part.from_text(text=SYSTEM_INSTRUCTION)] |
| ) |
|
|
| |
| |
| |
| async def handle_flash_model(request: ChatRequest, client, contents: list, tools: list): |
| """نموذج Flash: استجابة سريعة ومباشرة""" |
| chosen_model = "gemma-4-31b-it" |
| config = get_base_config(temperature=0.7, thinking_level="MINIMAL", tools=tools) |
| |
| stream = client.models.generate_content_stream( |
| model=chosen_model, |
| contents=contents, |
| config=config |
| ) |
| |
| def stream_generator(): |
| try: |
| for chunk in stream: |
| if chunk.text: |
| yield chunk.text |
| except Exception as e: |
| yield f"\n\n⚠️ **خطأ:** `{str(e)}`" |
| |
| return StreamingResponse(stream_generator(), media_type="text/plain") |
|
|
| |
| |
| |
| async def handle_pro_model(request: ChatRequest, client, contents: list, tools: list): |
| """نموذج Pro: تفكير منفصل داخل حاوية، ثم رد نهائي خارجها""" |
| chosen_model = "gemma-4-31b-it" |
| |
| |
| thinking_config = get_base_config(temperature=1.0, thinking_level="HIGH", tools=tools) |
| |
| thinking_response = client.models.generate_content( |
| model=chosen_model, |
| contents=contents, |
| config=thinking_config |
| ) |
| thinking_text = thinking_response.text.strip() if thinking_response.text else "" |
| |
| |
| |
| final_prompt = f"""[لقد قمت بعملية تفكير عميق حول سؤال المستخدم، وهذا ما توصلت إليه:] |
| |
| {thinking_text} |
| |
| [الآن، بناءً على هذا التفكير، قدم رداً نهائياً مباشراً ومفيداً للمستخدم. لا تذكر أنك فكرت، فقط قدم الرد النهائي.]""" |
| |
| final_contents = contents.copy() |
| final_contents.append(types.Content(role="user", parts=[types.Part.from_text(text=final_prompt)])) |
| |
| final_config = get_base_config(temperature=0.7, thinking_level="MINIMAL", tools=tools) |
| |
| stream = client.models.generate_content_stream( |
| model=chosen_model, |
| contents=final_contents, |
| config=final_config |
| ) |
| |
| def stream_generator(): |
| |
| yield thinking_text + " instant" |
| |
| |
| try: |
| for chunk in stream: |
| if chunk.text: |
| yield chunk.text |
| except Exception as e: |
| yield f"\n\n⚠️ **خطأ:** `{str(e)}`" |
| |
| return StreamingResponse(stream_generator(), media_type="text/plain") |
|
|
| |
| |
| |
| async def handle_image_request(request: ChatRequest, client): |
| """معالجة طلبات توليد الصور باستخدام NVIDIA Flux""" |
| def generate_image_stream(): |
| current_hour = datetime.now().strftime("%Y-%m-%d %H") |
| user_info = user_image_limits[request.user_id] |
| |
| if user_info.get("hour") != current_hour: |
| user_info["hour"] = current_hour |
| user_info["count"] = 0 |
|
|
| if user_info["count"] >= 3: |
| yield "⚠️ **عذراً!** لقد استنفدت رصيدك الحالي لتوليد الصور (3 صور في الساعة). يرجى المحاولة لاحقاً! 🕒" |
| return |
| |
| if not NVIDIA_API_KEY: |
| yield "⚠️ **خطأ في السيرفر:** مفتاح `NVIDIA_API_KEY` غير موجود." |
| return |
| |
| try: |
| |
| translation_response = client.models.generate_content( |
| model="gemma-4-31b-it", |
| contents=f"Translate this prompt to English for an image generator. Only return the English prompt: {request.message}" |
| ) |
| final_prompt = translation_response.text.strip() |
|
|
| invoke_url = "https://ai.api.nvidia.com/v1/genai/black-forest-labs/flux.2-klein-4b" |
| headers = { |
| "Authorization": f"Bearer {NVIDIA_API_KEY}", |
| "Accept": "application/json", |
| } |
|
|
| payload = { |
| "prompt": final_prompt, |
| "mode": "base", |
| "cfg_scale": 3.5, |
| "width": 1024, |
| "height": 1024, |
| "seed": 0, |
| "steps": 50 |
| } |
|
|
| if request.files: |
| img_file = next((f for f in request.files if f.mime_type.startswith("image/")), None) |
| if img_file: |
| payload["image"] = [f"data:{img_file.mime_type};base64,{img_file.data}"] |
|
|
| response = requests.post(invoke_url, headers=headers, json=payload, timeout=60) |
| |
| if not response.ok: |
| yield f"⚠️ **خطأ من سيرفر NVIDIA:**\n`{response.text}`" |
| return |
|
|
| response_body = response.json() |
| |
| def extract_img(obj): |
| if isinstance(obj, dict): |
| for k, v in obj.items(): |
| if k in ['b64_json', 'base64', 'image'] and isinstance(v, str) and len(v) > 100: |
| return v.split(",", 1)[-1] if v.startswith("data:") else v |
| elif k == 'url' and isinstance(v, str) and v.startswith('http'): |
| return base64.b64encode(requests.get(v).content).decode('utf-8') |
| else: |
| res = extract_img(v) |
| if res: return res |
| elif isinstance(obj, list): |
| for item in obj: |
| res = extract_img(item) |
| if res: return res |
| return None |
| |
| b64_image = extract_img(response_body) |
| |
| if not b64_image: |
| yield f"⚠️ **رد غير متوقع من الخادم (لم يتم العثور على الصورة):**" |
| return |
|
|
| user_info["count"] += 1 |
| rem = 3 - user_info["count"] |
| |
| html_response = f''' |
| <div style="text-align:center; margin: 15px 0;"> |
| <img src="data:image/jpeg;base64,{b64_image}" style="width:100%; max-width:400px; border-radius:18px; box-shadow:0 8px 25px rgba(0,0,0,0.15);" /> |
| <br/> |
| <a href="data:image/jpeg;base64,{b64_image}" download="Genisi_Flux_Art.jpg" style="display:inline-block; margin-top:12px; padding:10px 20px; background:linear-gradient(135deg, #4f8ef7, #7c5cf7); color:#fff; border-radius:25px; text-decoration:none; font-weight:600; font-family:'Cairo', sans-serif;">⬇️ تحميل الصورة</a> |
| <p style="font-size:0.85rem; color:#9aa3be; margin-top:8px;">✅ تم التصميم بنجاح (المتبقي لك هذه الساعة: {rem}/3 صور)</p> |
| </div>''' |
| yield html_response |
| except Exception as e: |
| yield f"⚠️ **خطأ أثناء توليد الصورة:**\n`{str(e)}`" |
| |
| return StreamingResponse(generate_image_stream(), media_type="text/plain") |
|
|
| |
| |
| |
| @app.post("/chat") |
| async def chat_endpoint(request: ChatRequest): |
| global CURRENT_TEXT_KEY_INDEX |
| |
| msg_lower = request.message.strip().lower() |
| current_model = request.model |
| |
| |
| image_triggers = ["ارسم", "صمم", "تخيل", "صورة ل", "draw", "generate", "imagine", "create", "عدل", "edit", "تصميم"] |
| is_image_request = any(msg_lower.startswith(trigger) for trigger in image_triggers) |
|
|
| |
| attempts = 0 |
| while attempts < len(text_clients): |
| try: |
| client = get_text_client() |
| |
| |
| if is_image_request: |
| return await handle_image_request(request, client) |
| |
| |
| contents, _ = build_contents(request.history, request.message, request.files) |
| tools = [types.Tool(googleSearch=types.GoogleSearch())] |
| |
| |
| if current_model == "pro": |
| return await handle_pro_model(request, client, contents, tools) |
| else: |
| return await handle_flash_model(request, client, contents, tools) |
| |
| except Exception as e: |
| error_msg = str(e).lower() |
| if "429" in error_msg or "quota" in error_msg: |
| CURRENT_TEXT_KEY_INDEX = (CURRENT_TEXT_KEY_INDEX + 1) % len(text_clients) |
| attempts += 1 |
| else: |
| def err_gen(): yield f"⚠️ **خطأ في النموذج:** {str(e)}" |
| return StreamingResponse(err_gen(), media_type="text/plain") |
| |
| def limit_gen(): yield "⚠️ تم الوصول للحد الأقصى لجميع المفاتيح." |
| return StreamingResponse(limit_gen(), media_type="text/plain") |
|
|
| |
| |
| |
| app.mount("/", StaticFiles(directory=".", html=True), name="static") |
|
|
| if __name__ == "__main__": |
| import uvicorn |
| uvicorn.run(app, host="0.0.0.0", port=8000) |