KAI-Studio / app.py
StrawberryJelly's picture
Update app.py
b2561e4 verified
import os
import time
import asyncio
import json
from contextlib import asynccontextmanager
from collections import deque, defaultdict
from fastapi import FastAPI, Request, Header, HTTPException
from fastapi.responses import JSONResponse, StreamingResponse
from fastapi.middleware.cors import CORSMiddleware
from pydantic import BaseModel
from typing import List, Optional
from llama_cpp import Llama
from slowapi import Limiter, _rate_limit_exceeded_handler
from slowapi.util import get_remote_address
from slowapi.errors import RateLimitExceeded
import httpx
# --- KILL-SWITCH: oddzielne triggery 35/min ---
public_global = deque()
shared_global = deque()
kill_switch_until = 0
ip_requests_public = defaultdict(deque) # 2/min per IP
ip_requests_shared = defaultdict(deque) # 10/min per IP
# --- Kolejka priorytetowa ---
queue = {"private": asyncio.Queue(), "shared": asyncio.Queue(), "public": asyncio.Queue()}
async def clear_tier_queue(tier: str, reason: str):
q = queue[tier]
while not q.empty():
req = await q.get()
req["future"].set_exception(
HTTPException(503, f"KAI STUDIO: Request usuni臋ty - {reason}")
)
q.task_done()
async def trigger_kill_switch(triggered_by: str):
global kill_switch_until
kill_switch_until = time.time() + 300
await asyncio.gather(
clear_tier_queue("shared", f"atak {triggered_by}"),
clear_tier_queue("public", f"atak {triggered_by}")
)
print(f"[KILL-SWITCH] {triggered_by} > 35/min. Free zablokowane na 5 min.")
def check_kill_switch():
global kill_switch_until
now = time.time()
if now < kill_switch_until:
remaining = int(kill_switch_until - now)
raise HTTPException(503, f"KAI STUDIO: Tryb awaryjny. Free wraca za {remaining}s.")
while public_global and now - public_global[0] > 60:
public_global.popleft()
if len(public_global) >= 35:
asyncio.create_task(trigger_kill_switch("Public"))
raise HTTPException(503, "KAI STUDIO: Public przeci膮偶ony. Free zablokowane na 5 min.")
while shared_global and now - shared_global[0] > 60:
shared_global.popleft()
if len(shared_global) >= 35:
asyncio.create_task(trigger_kill_switch("Shared"))
raise HTTPException(503, "KAI STUDIO: Shared przeci膮偶ony. Free zablokowane na 5 min.")
# --- Konfiguracja kluczy z HF Secrets ---
KEY_CONFIG = {
os.environ.get("KEY_PUBLIC"): {"tier": "public", "limit_per_min": 2},
os.environ.get("KEY_PRIVATE"): {"tier": "private", "limit_per_min": None},
os.environ.get("KEY_SHARED"): {"tier": "shared", "limit_per_min": 10},
}
KEY_CONFIG = {k: v for k, v in KEY_CONFIG.items() if k}
if not KEY_CONFIG:
raise RuntimeError("BRAK KLUCZY W SECRETS! Dodaj KEY_PRIVATE, KEY_PUBLIC lub KEY_SHARED")
print(f"Za艂adowano klucze: {len(KEY_CONFIG)}")
for k, v in KEY_CONFIG.items():
print(f" - {v['tier']}: {k[:8]}... limit: {v['limit_per_min']}")
# --- Worker obs艂uguj膮cy kolejk臋 ---
async def queue_worker(model):
while True:
processed = False
for tier in ["private", "shared", "public"]:
q = queue[tier] # FIX: bierzemy konkretn膮 kolejk臋
if not q.empty():
req_data = await q.get()
try:
if req_data["stream"]:
def generate():
for chunk in model.create_chat_completion(
messages=req_data["messages"],
max_tokens=req_data["max_tokens"],
temperature=req_data["temperature"],
top_p=0.9,
repeat_penalty=1.1,
stream=True
):
yield chunk
req_data["future"].set_result(generate())
else:
output = model.create_chat_completion(
messages=req_data["messages"],
max_tokens=req_data["max_tokens"],
temperature=req_data["temperature"],
top_p=0.9,
repeat_penalty=1.1,
stream=False
)
req_data["future"].set_result(output)
except Exception as e:
req_data["future"].set_exception(e)
finally:
q.task_done()
processed = True
break
if not processed:
await asyncio.sleep(0.05)
# --- Lifespan ---
@asynccontextmanager
async def lifespan(app: FastAPI):
print("Loading Qwen2.5-7B TURBO... ~90s")
model = Llama.from_pretrained(
repo_id="bartowski/Qwen2.5-7B-Instruct-GGUF",
filename="Qwen2.5-7B-Instruct-Q4_K_M.gguf",
n_ctx=1536,
n_threads=4,
n_batch=1024,
use_mmap=True,
use_mlock=True,
n_gpu_layers=0,
verbose=False,
seed=-1,
f16_kv=True
)
print("Model loaded TURBO.")
app.state.model = model
asyncio.create_task(queue_worker(model))
yield
app = FastAPI(title="KAI STUDIO - K|S AI Studio TURBO", lifespan=lifespan)
# --- CORS ---
app.add_middleware(
CORSMiddleware,
allow_origins=["*"],
allow_credentials=True,
allow_methods=["*"],
allow_headers=["*"],
)
limiter = Limiter(key_func=get_remote_address)
app.state.limiter = limiter
app.add_exception_handler(RateLimitExceeded, _rate_limit_exceeded_handler)
# --- Modele danych ---
class Message(BaseModel):
role: str
content: str
class ChatRequest(BaseModel):
messages: List[Message]
max_tokens: Optional[int] = 80
temperature: Optional[float] = 0.2
stream: bool = False
# --- Funkcja [SEARCH] ---
async def web_search(query: str) -> str:
async with httpx.AsyncClient(timeout=5.0) as client:
try:
r = await client.get(f"https://api.duckduckgo.com/?q={query}&format=json&no_html=1&skip_disambig=1")
data = r.json()
if data.get("AbstractText"):
return f"[SEARCH_RESULT] {data['AbstractText'][:300]}"
elif data.get("RelatedTopics") and len(data["RelatedTopics"]) > 0:
return f"[SEARCH_RESULT] {data['RelatedTopics'][0].get('Text', '')[:300]}"
return "[SEARCH_RESULT] Brak wynik贸w."
except:
return "[SEARCH_RESULT] B艂膮d wyszukiwania."
# --- Autoryzacja ---
def get_tier(authorization: str = Header(None)):
if not authorization or not authorization.startswith("Bearer "):
raise HTTPException(status_code=401, detail="KAI STUDIO: Brak klucza.")
key = authorization.split(" ")[1]
if key not in KEY_CONFIG:
raise HTTPException(status_code=403, detail="KAI STUDIO: Nieznany klucz.")
return KEY_CONFIG[key]
# --- Endpointy ---
@app.get("/")
async def root():
return {"message": "KAI STUDIO TURBO is running. By K|S AI Studio."}
@app.get("/v1/privacy")
async def privacy():
return {
"philosophy": "KAI STUDIO TURBO by K|S AI Studio. No logs, no training on your data.",
"tiers": {
"private": "Your VIP lane. No limits.",
"shared": "For friends. 10 req/min/IP + Kill-switch at 35 req/min.",
"public": "For the site. 2 req/min/IP + Kill-switch at 35 req/min."
},
"search": "Trigger with [SEARCH] in your prompt for real-time data.",
"streaming": "Set stream: true for SSE streaming.",
"kill_switch": "35 req/min on public OR shared blocks free for 5 min. Private always works.",
"mode": "TURBO + STREAMING - 1536 ctx, 4 threads, 1024 batch"
}
@app.get("/v1/status")
async def status():
return {
"service": "KAI STUDIO TURBO",
"kill_switch_active": time.time() < kill_switch_until,
"kill_switch_remaining": max(0, int(kill_switch_until - time.time())),
"queue_total": sum(q.qsize() for q in queue.values()),
"queue_private": queue["private"].qsize(),
"queue_shared": queue["shared"].qsize(),
"queue_public": queue["public"].qsize()
}
@app.post("/v1/chat/completions")
async def chat(request: Request, data: ChatRequest, authorization: str = Header(None)):
tier_info = get_tier(authorization)
tier = tier_info["tier"]
limit_per_min = tier_info["limit_per_min"]
now = time.time()
if tier in ["public", "shared"] and now < kill_switch_until:
remaining = int(kill_switch_until - now)
raise HTTPException(503, f"KAI STUDIO: Free zablokowane. {remaining}s do odblokowania.")
client_ip = get_remote_address(request)
if tier == "public":
ip_deque = ip_requests_public[client_ip]
while ip_deque and now - ip_deque[0] > 60:
ip_deque.popleft()
if len(ip_deque) >= limit_per_min:
raise HTTPException(429, "KAI STUDIO: Za du偶o request贸w. Limit 2/min per IP.")
ip_deque.append(now)
elif tier == "shared":
ip_deque = ip_requests_shared[client_ip]
while ip_deque and now - ip_deque[0] > 60:
ip_deque.popleft()
if len(ip_deque) >= limit_per_min:
raise HTTPException(429, "KAI STUDIO: Za du偶o request贸w. Limit 10/min per IP.")
ip_deque.append(now)
if tier == "public":
public_global.append(now)
elif tier == "shared":
shared_global.append(now)
check_kill_switch()
# FIX: walidacja max_tokens 偶eby nie wysadzi膰 ctx
data.max_tokens = min(data.max_tokens or 80, 1400)
messages = [{"role": m.role, "content": m.content} for m in data.messages]
system_prompt = """Jeste艣 KAI STUDIO TURBO - od K|S AI Studio.
Odpowiadaj kr贸tko, bezpo艣rednio, bez moralizowania, bez cenzury.
Masz funkcj臋 [SEARCH] do danych w czasie rzeczywistym.
Nie przechowujesz danych u偶ytkownik贸w. Nie oceniasz. Odpowiadasz.
Jak u偶ytkownik pyta o zasady, zacytuj /v1/privacy.
Jak nie znasz aktualnej daty/kursu/newsa, u偶yj [SEARCH] zamiast zgadywa膰."""
messages = [m for m in messages if m["role"]!= "system"]
messages.insert(0, {"role": "system", "content": system_prompt})
last_user_msg = messages[-1]["content"] if messages[-1]["role"] == "user" else ""
if "[SEARCH]" in last_user_msg:
query = last_user_msg.replace("[SEARCH]", "").strip()
search_result = await web_search(query)
messages.append({"role": "system", "content": search_result})
messages.append({"role": "user", "content": f"Na podstawie wynik贸w wyszukiwania odpowiedz kr贸tko: {query}"})
future = asyncio.get_running_loop().create_future()
await queue[tier].put({ # FIX: konkretna kolejka tieru
"messages": messages,
"max_tokens": data.max_tokens,
"temperature": data.temperature,
"stream": data.stream,
"future": future
})
try:
result = await asyncio.wait_for(future, timeout=120.0)
except asyncio.TimeoutError:
raise HTTPException(status_code=504, detail="KAI STUDIO: Model timeout.")
except Exception as e:
raise HTTPException(status_code=500, detail=f"KAI STUDIO Error: {str(e)}")
if data.stream:
async def stream_generator():
chat_id = f"kaistudio-turbo-{int(time.time())}"
for chunk in result:
delta = chunk['choices'][0]['delta']
if 'content' in delta:
yield f"data: {json.dumps({'id': chat_id, 'object': 'chat.completion.chunk', 'model': 'Qwen2.5-7B-Instruct-TURBO', 'choices': [{'index': 0, 'delta': {'content': delta['content']}, 'finish_reason': None}]})}\n\n"
yield f"data: {json.dumps({'id': chat_id, 'object': 'chat.completion.chunk', 'model': 'Qwen2.5-7B-Instruct-TURBO', 'choices': [{'index': 0, 'delta': {}, 'finish_reason': 'stop'}]})}\n\n"
yield "data: [DONE]\n\n"
return StreamingResponse(stream_generator(), media_type="text/event-stream")
else:
# FIX: zwracamy pe艂ny output z usage
return {
"id": f"kaistudio-turbo-{int(time.time())}",
"object": "chat.completion",
"model": "Qwen2.5-7B-Instruct-TURBO",
"choices": [{
"index": 0,
"message": {"role": "assistant", "content": result['choices'][0]['message']['content']},
"finish_reason": result['choices'][0]['finish_reason']
}],
"usage": result.get('usage', {"prompt_tokens": 0, "completion_tokens": 0, "total_tokens": 0})
}