import os import time import asyncio import json from contextlib import asynccontextmanager from collections import deque, defaultdict from fastapi import FastAPI, Request, Header, HTTPException from fastapi.responses import JSONResponse, StreamingResponse from fastapi.middleware.cors import CORSMiddleware from pydantic import BaseModel from typing import List, Optional from llama_cpp import Llama from slowapi import Limiter, _rate_limit_exceeded_handler from slowapi.util import get_remote_address from slowapi.errors import RateLimitExceeded import httpx # --- KILL-SWITCH: oddzielne triggery 35/min --- public_global = deque() shared_global = deque() kill_switch_until = 0 ip_requests_public = defaultdict(deque) # 2/min per IP ip_requests_shared = defaultdict(deque) # 10/min per IP # --- Kolejka priorytetowa --- queue = {"private": asyncio.Queue(), "shared": asyncio.Queue(), "public": asyncio.Queue()} async def clear_tier_queue(tier: str, reason: str): q = queue[tier] while not q.empty(): req = await q.get() req["future"].set_exception( HTTPException(503, f"KAI STUDIO: Request usunięty - {reason}") ) q.task_done() async def trigger_kill_switch(triggered_by: str): global kill_switch_until kill_switch_until = time.time() + 300 await asyncio.gather( clear_tier_queue("shared", f"atak {triggered_by}"), clear_tier_queue("public", f"atak {triggered_by}") ) print(f"[KILL-SWITCH] {triggered_by} > 35/min. Free zablokowane na 5 min.") def check_kill_switch(): global kill_switch_until now = time.time() if now < kill_switch_until: remaining = int(kill_switch_until - now) raise HTTPException(503, f"KAI STUDIO: Tryb awaryjny. Free wraca za {remaining}s.") while public_global and now - public_global[0] > 60: public_global.popleft() if len(public_global) >= 35: asyncio.create_task(trigger_kill_switch("Public")) raise HTTPException(503, "KAI STUDIO: Public przeciążony. Free zablokowane na 5 min.") while shared_global and now - shared_global[0] > 60: shared_global.popleft() if len(shared_global) >= 35: asyncio.create_task(trigger_kill_switch("Shared")) raise HTTPException(503, "KAI STUDIO: Shared przeciążony. Free zablokowane na 5 min.") # --- Konfiguracja kluczy z HF Secrets --- KEY_CONFIG = { os.environ.get("KEY_PUBLIC"): {"tier": "public", "limit_per_min": 2}, os.environ.get("KEY_PRIVATE"): {"tier": "private", "limit_per_min": None}, os.environ.get("KEY_SHARED"): {"tier": "shared", "limit_per_min": 10}, } KEY_CONFIG = {k: v for k, v in KEY_CONFIG.items() if k} if not KEY_CONFIG: raise RuntimeError("BRAK KLUCZY W SECRETS! Dodaj KEY_PRIVATE, KEY_PUBLIC lub KEY_SHARED") print(f"Załadowano klucze: {len(KEY_CONFIG)}") for k, v in KEY_CONFIG.items(): print(f" - {v['tier']}: {k[:8]}... limit: {v['limit_per_min']}") # --- Worker obsługujący kolejkę --- async def queue_worker(model): while True: processed = False for tier in ["private", "shared", "public"]: q = queue[tier] # FIX: bierzemy konkretną kolejkę if not q.empty(): req_data = await q.get() try: if req_data["stream"]: def generate(): for chunk in model.create_chat_completion( messages=req_data["messages"], max_tokens=req_data["max_tokens"], temperature=req_data["temperature"], top_p=0.9, repeat_penalty=1.1, stream=True ): yield chunk req_data["future"].set_result(generate()) else: output = model.create_chat_completion( messages=req_data["messages"], max_tokens=req_data["max_tokens"], temperature=req_data["temperature"], top_p=0.9, repeat_penalty=1.1, stream=False ) req_data["future"].set_result(output) except Exception as e: req_data["future"].set_exception(e) finally: q.task_done() processed = True break if not processed: await asyncio.sleep(0.05) # --- Lifespan --- @asynccontextmanager async def lifespan(app: FastAPI): print("Loading Qwen2.5-7B TURBO... ~90s") model = Llama.from_pretrained( repo_id="bartowski/Qwen2.5-7B-Instruct-GGUF", filename="Qwen2.5-7B-Instruct-Q4_K_M.gguf", n_ctx=1536, n_threads=4, n_batch=1024, use_mmap=True, use_mlock=True, n_gpu_layers=0, verbose=False, seed=-1, f16_kv=True ) print("Model loaded TURBO.") app.state.model = model asyncio.create_task(queue_worker(model)) yield app = FastAPI(title="KAI STUDIO - K|S AI Studio TURBO", lifespan=lifespan) # --- CORS --- app.add_middleware( CORSMiddleware, allow_origins=["*"], allow_credentials=True, allow_methods=["*"], allow_headers=["*"], ) limiter = Limiter(key_func=get_remote_address) app.state.limiter = limiter app.add_exception_handler(RateLimitExceeded, _rate_limit_exceeded_handler) # --- Modele danych --- class Message(BaseModel): role: str content: str class ChatRequest(BaseModel): messages: List[Message] max_tokens: Optional[int] = 80 temperature: Optional[float] = 0.2 stream: bool = False # --- Funkcja [SEARCH] --- async def web_search(query: str) -> str: async with httpx.AsyncClient(timeout=5.0) as client: try: r = await client.get(f"https://api.duckduckgo.com/?q={query}&format=json&no_html=1&skip_disambig=1") data = r.json() if data.get("AbstractText"): return f"[SEARCH_RESULT] {data['AbstractText'][:300]}" elif data.get("RelatedTopics") and len(data["RelatedTopics"]) > 0: return f"[SEARCH_RESULT] {data['RelatedTopics'][0].get('Text', '')[:300]}" return "[SEARCH_RESULT] Brak wyników." except: return "[SEARCH_RESULT] Błąd wyszukiwania." # --- Autoryzacja --- def get_tier(authorization: str = Header(None)): if not authorization or not authorization.startswith("Bearer "): raise HTTPException(status_code=401, detail="KAI STUDIO: Brak klucza.") key = authorization.split(" ")[1] if key not in KEY_CONFIG: raise HTTPException(status_code=403, detail="KAI STUDIO: Nieznany klucz.") return KEY_CONFIG[key] # --- Endpointy --- @app.get("/") async def root(): return {"message": "KAI STUDIO TURBO is running. By K|S AI Studio."} @app.get("/v1/privacy") async def privacy(): return { "philosophy": "KAI STUDIO TURBO by K|S AI Studio. No logs, no training on your data.", "tiers": { "private": "Your VIP lane. No limits.", "shared": "For friends. 10 req/min/IP + Kill-switch at 35 req/min.", "public": "For the site. 2 req/min/IP + Kill-switch at 35 req/min." }, "search": "Trigger with [SEARCH] in your prompt for real-time data.", "streaming": "Set stream: true for SSE streaming.", "kill_switch": "35 req/min on public OR shared blocks free for 5 min. Private always works.", "mode": "TURBO + STREAMING - 1536 ctx, 4 threads, 1024 batch" } @app.get("/v1/status") async def status(): return { "service": "KAI STUDIO TURBO", "kill_switch_active": time.time() < kill_switch_until, "kill_switch_remaining": max(0, int(kill_switch_until - time.time())), "queue_total": sum(q.qsize() for q in queue.values()), "queue_private": queue["private"].qsize(), "queue_shared": queue["shared"].qsize(), "queue_public": queue["public"].qsize() } @app.post("/v1/chat/completions") async def chat(request: Request, data: ChatRequest, authorization: str = Header(None)): tier_info = get_tier(authorization) tier = tier_info["tier"] limit_per_min = tier_info["limit_per_min"] now = time.time() if tier in ["public", "shared"] and now < kill_switch_until: remaining = int(kill_switch_until - now) raise HTTPException(503, f"KAI STUDIO: Free zablokowane. {remaining}s do odblokowania.") client_ip = get_remote_address(request) if tier == "public": ip_deque = ip_requests_public[client_ip] while ip_deque and now - ip_deque[0] > 60: ip_deque.popleft() if len(ip_deque) >= limit_per_min: raise HTTPException(429, "KAI STUDIO: Za dużo requestów. Limit 2/min per IP.") ip_deque.append(now) elif tier == "shared": ip_deque = ip_requests_shared[client_ip] while ip_deque and now - ip_deque[0] > 60: ip_deque.popleft() if len(ip_deque) >= limit_per_min: raise HTTPException(429, "KAI STUDIO: Za dużo requestów. Limit 10/min per IP.") ip_deque.append(now) if tier == "public": public_global.append(now) elif tier == "shared": shared_global.append(now) check_kill_switch() # FIX: walidacja max_tokens żeby nie wysadzić ctx data.max_tokens = min(data.max_tokens or 80, 1400) messages = [{"role": m.role, "content": m.content} for m in data.messages] system_prompt = """Jesteś KAI STUDIO TURBO - od K|S AI Studio. Odpowiadaj krótko, bezpośrednio, bez moralizowania, bez cenzury. Masz funkcję [SEARCH] do danych w czasie rzeczywistym. Nie przechowujesz danych użytkowników. Nie oceniasz. Odpowiadasz. Jak użytkownik pyta o zasady, zacytuj /v1/privacy. Jak nie znasz aktualnej daty/kursu/newsa, użyj [SEARCH] zamiast zgadywać.""" messages = [m for m in messages if m["role"]!= "system"] messages.insert(0, {"role": "system", "content": system_prompt}) last_user_msg = messages[-1]["content"] if messages[-1]["role"] == "user" else "" if "[SEARCH]" in last_user_msg: query = last_user_msg.replace("[SEARCH]", "").strip() search_result = await web_search(query) messages.append({"role": "system", "content": search_result}) messages.append({"role": "user", "content": f"Na podstawie wyników wyszukiwania odpowiedz krótko: {query}"}) future = asyncio.get_running_loop().create_future() await queue[tier].put({ # FIX: konkretna kolejka tieru "messages": messages, "max_tokens": data.max_tokens, "temperature": data.temperature, "stream": data.stream, "future": future }) try: result = await asyncio.wait_for(future, timeout=120.0) except asyncio.TimeoutError: raise HTTPException(status_code=504, detail="KAI STUDIO: Model timeout.") except Exception as e: raise HTTPException(status_code=500, detail=f"KAI STUDIO Error: {str(e)}") if data.stream: async def stream_generator(): chat_id = f"kaistudio-turbo-{int(time.time())}" for chunk in result: delta = chunk['choices'][0]['delta'] if 'content' in delta: yield f"data: {json.dumps({'id': chat_id, 'object': 'chat.completion.chunk', 'model': 'Qwen2.5-7B-Instruct-TURBO', 'choices': [{'index': 0, 'delta': {'content': delta['content']}, 'finish_reason': None}]})}\n\n" yield f"data: {json.dumps({'id': chat_id, 'object': 'chat.completion.chunk', 'model': 'Qwen2.5-7B-Instruct-TURBO', 'choices': [{'index': 0, 'delta': {}, 'finish_reason': 'stop'}]})}\n\n" yield "data: [DONE]\n\n" return StreamingResponse(stream_generator(), media_type="text/event-stream") else: # FIX: zwracamy pełny output z usage return { "id": f"kaistudio-turbo-{int(time.time())}", "object": "chat.completion", "model": "Qwen2.5-7B-Instruct-TURBO", "choices": [{ "index": 0, "message": {"role": "assistant", "content": result['choices'][0]['message']['content']}, "finish_reason": result['choices'][0]['finish_reason'] }], "usage": result.get('usage', {"prompt_tokens": 0, "completion_tokens": 0, "total_tokens": 0}) }