API / app.py
Zenkad's picture
Create app.py
298bf1e verified
raw
history blame
3.54 kB
from fastapi import FastAPI, Request
from fastapi.middleware.cors import CORSMiddleware
from fastapi.responses import JSONResponse
import requests, os
from datetime import datetime, date
# =========================
# CONFIG
# =========================
HF_MODEL = "google/mt5-small"
HF_API_URL = f"https://api-inference.huggingface.co/models/{HF_MODEL}"
HF_TOKEN = os.getenv("HF_TOKEN", "")
SHOPIER_LINK = "https://shopier.com/42153760"
FREE_DAILY_LIMIT = 30
# =========================
# APP
# =========================
app = FastAPI(title="ZenkaMind API")
app.add_middleware(
CORSMiddleware,
allow_origins=["*"],
allow_methods=["*"],
allow_headers=["*"],
)
# =========================
# SIMPLE MEMORY (RAM)
# =========================
USERS = {}
def get_user(email):
today = date.today().isoformat()
if email not in USERS:
USERS[email] = {
"count": 0,
"date": today,
"premium": False
}
if USERS[email]["date"] != today:
USERS[email]["count"] = 0
USERS[email]["date"] = today
return USERS[email]
# =========================
# AI CALL
# =========================
def ask_ai(prompt: str):
headers = {}
if HF_TOKEN:
headers["Authorization"] = f"Bearer {HF_TOKEN}"
payload = {
"inputs": prompt,
"parameters": {
"max_length": 300,
"temperature": 0.6
}
}
r = requests.post(
HF_API_URL,
headers=headers,
json=payload,
timeout=20
)
if r.status_code != 200:
return None
data = r.json()
if isinstance(data, list) and len(data) > 0:
return data[0].get("generated_text", "").strip()
return None
# =========================
# PROMPT
# =========================
def build_prompt(user_message: str):
return f"""
Sen ZenkaMind’sin.
- Sadece Türkçe konuş.
- Net, açık ve kararlı cevaplar ver.
- Eğlence botu gibi davranma.
- Kullanıcıyı geliştirmeyi hedefle.
- Gereksiz uzatma yapma.
- Yanlış bir şey sorulursa açıkça yanlış olduğunu söyle.
Kullanıcı: {user_message}
ZenkaMind:
"""
# =========================
# ROUTES
# =========================
@app.get("/")
async def root():
return {"status": "ZenkaMind API çalışıyor"}
@app.get("/health")
async def health():
return {
"status": "ok",
"model": HF_MODEL,
"time": datetime.now().isoformat()
}
@app.post("/api/chat")
async def chat(req: Request):
data = await req.json()
email = data.get("email", "").lower()
message = data.get("message", "").strip()
if not email or "@" not in email:
return JSONResponse(400, {"error": "Email gerekli"})
if not message:
return JSONResponse(400, {"error": "Mesaj boş"})
user = get_user(email)
if not user["premium"] and user["count"] >= FREE_DAILY_LIMIT:
return {
"response": f"Günlük limit doldu.\n💎 Premium: {SHOPIER_LINK}",
"status": "limit"
}
prompt = build_prompt(message)
answer = ask_ai(prompt)
if not answer:
return {
"response": "Sunucu yoğun. Lütfen biraz sonra tekrar deneyin.",
"status": "error"
}
user["count"] += 1
if not user["premium"]:
answer += f"\n\n💎 Premium ile limitsiz kullanım: {SHOPIER_LINK}"
return {
"response": answer,
"remaining": "unlimited" if user["premium"] else max(0, FREE_DAILY_LIMIT - user["count"])
}