import os
import base64
import requests
from collections import defaultdict
from datetime import datetime
from fastapi import FastAPI
from fastapi.responses import StreamingResponse
from fastapi.staticfiles import StaticFiles
from pydantic import BaseModel
from google import genai
from google.genai import types
app = FastAPI()
# ============================================================
# إعدادات المفاتيح
# ============================================================
TEXT_KEYS_RAW = os.environ.get("TEXT_API_KEYS", os.environ.get("GEMINI_API_KEY", ""))
TEXT_API_KEYS = [k.strip() for k in TEXT_KEYS_RAW.split(",") if k.strip()]
NVIDIA_API_KEY = os.environ.get("NVIDIA_API_KEY", "")
text_clients = [genai.Client(api_key=key) for key in TEXT_API_KEYS] if TEXT_API_KEYS else []
CURRENT_TEXT_KEY_INDEX = 0
user_image_limits = defaultdict(lambda: {"hour": datetime.now().strftime("%Y-%m-%d %H"), "count": 0})
# ============================================================
# النماذج
# ============================================================
class FileData(BaseModel):
mime_type: str
data: str
name: str
class ChatRequest(BaseModel):
user_id: str
message: str
history: list
files: list[FileData] = []
model: str = "flash"
# ============================================================
# تعليمات النظام
# ============================================================
SYSTEM_INSTRUCTION = """أنت Genisi، نموذج ذكاء اصطناعي متطور من مبادرة AnesNT.
**معلومات عن AnesNT:**
- مبادرة تكنولوجية جزائرية من ولاية باتنة 🇩🇿
- المؤسس: أنس كامش (Anes Kameche)
- حالياً أنس هو العضو الوحيد في المبادرة
**تعليمات مهمة:**
1. تحدث بلغة المستخدم.
2. كن مفيداً ودوداً.
3. استخدم البحث من Google عند الحاجة.
4. استخدم الإيموجي بشكل مناسب.
5. عند البرمجة اكتب اكواد طويلة و يمنع عليك منعا باتا اختصارها وكلما اعجب المستخدم بالاجابة المقدمة من طرفك لك مكافاة +1 وعند وصولك للمئة تزداد قيمتك في العالم استخدم اسلوب claude الدقيق في البرمجة"""
# ============================================================
# دوال مساعدة
# ============================================================
def get_text_client():
"""الحصول على العميل الحالي مع تدوير المفاتيح عند الحاجة"""
global CURRENT_TEXT_KEY_INDEX
if not text_clients:
raise ValueError("لم يتم العثور على مفاتيح النصوص.")
return text_clients[CURRENT_TEXT_KEY_INDEX]
def build_contents(history: list, message: str, files: list) -> tuple:
"""بناء محتوى المحادثة للـ API"""
contents = []
has_images = False
# بناء التاريخ
for entry in history:
if entry.get('user') and str(entry['user']).strip():
contents.append(types.Content(role="user", parts=[types.Part.from_text(text=str(entry['user']).strip())]))
if entry.get('bot') and str(entry['bot']).strip():
bot_txt = entry['bot']
if '
= 3:
yield "⚠️ **عذراً!** لقد استنفدت رصيدك الحالي لتوليد الصور (3 صور في الساعة). يرجى المحاولة لاحقاً! 🕒"
return
if not NVIDIA_API_KEY:
yield "⚠️ **خطأ في السيرفر:** مفتاح `NVIDIA_API_KEY` غير موجود."
return
try:
# ترجمة الطلب للإنجليزية
translation_response = client.models.generate_content(
model="gemma-4-31b-it",
contents=f"Translate this prompt to English for an image generator. Only return the English prompt: {request.message}"
)
final_prompt = translation_response.text.strip()
invoke_url = "https://ai.api.nvidia.com/v1/genai/black-forest-labs/flux.2-klein-4b"
headers = {
"Authorization": f"Bearer {NVIDIA_API_KEY}",
"Accept": "application/json",
}
payload = {
"prompt": final_prompt,
"mode": "base",
"cfg_scale": 3.5,
"width": 1024,
"height": 1024,
"seed": 0,
"steps": 50
}
if request.files:
img_file = next((f for f in request.files if f.mime_type.startswith("image/")), None)
if img_file:
payload["image"] = [f"data:{img_file.mime_type};base64,{img_file.data}"]
response = requests.post(invoke_url, headers=headers, json=payload, timeout=60)
if not response.ok:
yield f"⚠️ **خطأ من سيرفر NVIDIA:**\n`{response.text}`"
return
response_body = response.json()
def extract_img(obj):
if isinstance(obj, dict):
for k, v in obj.items():
if k in ['b64_json', 'base64', 'image'] and isinstance(v, str) and len(v) > 100:
return v.split(",", 1)[-1] if v.startswith("data:") else v
elif k == 'url' and isinstance(v, str) and v.startswith('http'):
return base64.b64encode(requests.get(v).content).decode('utf-8')
else:
res = extract_img(v)
if res: return res
elif isinstance(obj, list):
for item in obj:
res = extract_img(item)
if res: return res
return None
b64_image = extract_img(response_body)
if not b64_image:
yield f"⚠️ **رد غير متوقع من الخادم (لم يتم العثور على الصورة):**"
return
user_info["count"] += 1
rem = 3 - user_info["count"]
html_response = f'''
'''
yield html_response
except Exception as e:
yield f"⚠️ **خطأ أثناء توليد الصورة:**\n`{str(e)}`"
return StreamingResponse(generate_image_stream(), media_type="text/plain")
# ============================================================
# نقطة النهاية الرئيسية
# ============================================================
@app.post("/chat")
async def chat_endpoint(request: ChatRequest):
global CURRENT_TEXT_KEY_INDEX
msg_lower = request.message.strip().lower()
current_model = request.model
# التحقق من طلبات الصور
image_triggers = ["ارسم", "صمم", "تخيل", "صورة ل", "draw", "generate", "imagine", "create", "عدل", "edit", "تصميم"]
is_image_request = any(msg_lower.startswith(trigger) for trigger in image_triggers)
# محاولة تنفيذ الطلب مع تدوير المفاتيح
attempts = 0
while attempts < len(text_clients):
try:
client = get_text_client()
# --- طلب صورة ---
if is_image_request:
return await handle_image_request(request, client)
# --- طلب نصي ---
contents, _ = build_contents(request.history, request.message, request.files)
tools = [types.Tool(googleSearch=types.GoogleSearch())]
# 🌟 توجيه إلى الدالة المناسبة حسب النموذج
if current_model == "pro":
return await handle_pro_model(request, client, contents, tools)
else:
return await handle_flash_model(request, client, contents, tools)
except Exception as e:
error_msg = str(e).lower()
if "429" in error_msg or "quota" in error_msg:
CURRENT_TEXT_KEY_INDEX = (CURRENT_TEXT_KEY_INDEX + 1) % len(text_clients)
attempts += 1
else:
def err_gen(): yield f"⚠️ **خطأ في النموذج:** {str(e)}"
return StreamingResponse(err_gen(), media_type="text/plain")
def limit_gen(): yield "⚠️ تم الوصول للحد الأقصى لجميع المفاتيح."
return StreamingResponse(limit_gen(), media_type="text/plain")
# ============================================================
# تقديم الملفات الثابتة
# ============================================================
app.mount("/", StaticFiles(directory=".", html=True), name="static")
if __name__ == "__main__":
import uvicorn
uvicorn.run(app, host="0.0.0.0", port=8000)