API / app.py
Zenkad's picture
Create app.py
298bf1e verified
from fastapi import FastAPI, Request
from fastapi.middleware.cors import CORSMiddleware
from fastapi.responses import JSONResponse
import requests, os
from datetime import datetime, date
# =========================
# CONFIG
# =========================
HF_MODEL = "google/mt5-small"
HF_API_URL = f"https://api-inference.huggingface.co/models/{HF_MODEL}"
HF_TOKEN = os.getenv("HF_TOKEN", "")
SHOPIER_LINK = "https://shopier.com/42153760"
FREE_DAILY_LIMIT = 30
# =========================
# APP
# =========================
app = FastAPI(title="ZenkaMind API")
app.add_middleware(
CORSMiddleware,
allow_origins=["*"],
allow_methods=["*"],
allow_headers=["*"],
)
# =========================
# SIMPLE MEMORY (RAM)
# =========================
USERS = {}
def get_user(email):
today = date.today().isoformat()
if email not in USERS:
USERS[email] = {
"count": 0,
"date": today,
"premium": False
}
if USERS[email]["date"] != today:
USERS[email]["count"] = 0
USERS[email]["date"] = today
return USERS[email]
# =========================
# AI CALL
# =========================
def ask_ai(prompt: str):
headers = {}
if HF_TOKEN:
headers["Authorization"] = f"Bearer {HF_TOKEN}"
payload = {
"inputs": prompt,
"parameters": {
"max_length": 300,
"temperature": 0.6
}
}
r = requests.post(
HF_API_URL,
headers=headers,
json=payload,
timeout=20
)
if r.status_code != 200:
return None
data = r.json()
if isinstance(data, list) and len(data) > 0:
return data[0].get("generated_text", "").strip()
return None
# =========================
# PROMPT
# =========================
def build_prompt(user_message: str):
return f"""
Sen ZenkaMind’sin.
- Sadece Türkçe konuş.
- Net, açık ve kararlı cevaplar ver.
- Eğlence botu gibi davranma.
- Kullanıcıyı geliştirmeyi hedefle.
- Gereksiz uzatma yapma.
- Yanlış bir şey sorulursa açıkça yanlış olduğunu söyle.
Kullanıcı: {user_message}
ZenkaMind:
"""
# =========================
# ROUTES
# =========================
@app.get("/")
async def root():
return {"status": "ZenkaMind API çalışıyor"}
@app.get("/health")
async def health():
return {
"status": "ok",
"model": HF_MODEL,
"time": datetime.now().isoformat()
}
@app.post("/api/chat")
async def chat(req: Request):
data = await req.json()
email = data.get("email", "").lower()
message = data.get("message", "").strip()
if not email or "@" not in email:
return JSONResponse(400, {"error": "Email gerekli"})
if not message:
return JSONResponse(400, {"error": "Mesaj boş"})
user = get_user(email)
if not user["premium"] and user["count"] >= FREE_DAILY_LIMIT:
return {
"response": f"Günlük limit doldu.\n💎 Premium: {SHOPIER_LINK}",
"status": "limit"
}
prompt = build_prompt(message)
answer = ask_ai(prompt)
if not answer:
return {
"response": "Sunucu yoğun. Lütfen biraz sonra tekrar deneyin.",
"status": "error"
}
user["count"] += 1
if not user["premium"]:
answer += f"\n\n💎 Premium ile limitsiz kullanım: {SHOPIER_LINK}"
return {
"response": answer,
"remaining": "unlimited" if user["premium"] else max(0, FREE_DAILY_LIMIT - user["count"])
}