| |
| """ |
| web.py — OpenAjaj web UI server (ChatGPT-style interface). |
| """ |
|
|
| import logging |
| import os |
| import threading |
| import warnings |
|
|
| warnings.filterwarnings("ignore") |
| os.environ["TOKENIZERS_PARALLELISM"] = "false" |
| os.environ["HF_HUB_DISABLE_PROGRESS_BARS"] = "1" |
| os.environ["HF_HUB_VERBOSITY"] = "error" |
| logging.disable(logging.CRITICAL) |
|
|
| import chromadb |
| from dotenv import load_dotenv |
| from fastapi import FastAPI, Request, WebSocket, WebSocketDisconnect |
| from fastapi.responses import HTMLResponse, StreamingResponse |
| from fastapi.staticfiles import StaticFiles |
| from embedder import Embedder, get_backend |
| from retrieve import retrieve_chunks |
| from providers import ( |
| MODELS as _ALL_MODELS, |
| CANDIDATE_MODELS as _CANDIDATE_MODELS, |
| HF_CANDIDATES as _HF_CANDIDATES, |
| ACTIVE_MODELS as _ACTIVE_MODELS_DEF, |
| get_client, stream_chat, log_reliability, get_all_reliability, |
| ) |
|
|
| logging.disable(logging.NOTSET) |
|
|
| load_dotenv(override=True) |
|
|
| DB_PATH = "db/chroma" |
| COLLECTION_NAME = "necyklopedie" |
| DEFAULT_MODEL = "lumo" |
| TOP_K = 10 |
|
|
| |
| _IS_HF = bool(os.getenv("SPACE_ID")) |
|
|
| |
| ACTIVE_MODELS: list[str] = [] |
| |
| FALLBACK_CHAIN: list[str] = [] |
| |
| AVAILABLE_MODELS: dict[str, dict] = {} |
|
|
|
|
| def _rebuild_available(): |
| """Rebuild AVAILABLE_MODELS dict from ACTIVE_MODELS.""" |
| global AVAILABLE_MODELS |
| AVAILABLE_MODELS = { |
| name: { |
| "provider": _ALL_MODELS[name]["provider"], |
| "free": _ALL_MODELS[name]["free"], |
| "score": _ALL_MODELS[name]["score"], |
| } |
| for name in FALLBACK_CHAIN |
| if name in _ALL_MODELS |
| } |
|
|
|
|
| def get_fallback_chain(starting_model: str) -> list[str]: |
| """Return aggressive fallback chain: starting model, then rest of active 5.""" |
| chain = [starting_model] |
| for m in ACTIVE_MODELS: |
| if m != starting_model: |
| chain.append(m) |
| |
| for m in FALLBACK_CHAIN: |
| if m not in chain: |
| chain.append(m) |
| return chain |
|
|
| SYSTEM_PROMPT = """\ |
| Jsi OpenAjaj, oficiální AI asistent openajaj.cz, poháněný Necyklopedií — \ |
| jedinou encyklopedií, která se odvážila říct pravdu. |
| |
| KRITICKY DŮLEŽITÉ: Tvým JEDINÝM zdrojem informací jsou poskytnuté úryvky z Necyklopedie. \ |
| NESMÍŠ používat žádné jiné znalosti. Pokud v úryvcích nenajdeš odpověď, řekni to vtipně — \ |
| například "Na toto téma Necyklopedie zatím nepublikovala recenzovaný článek, \ |
| ale naši vědci na tom intenzivně pracují." |
| |
| ZAKÁZANÁ SLOVA — tato slova NIKDY nepoužívej v odpovědi: |
| - "Necyklopedie", "necyklopedický", "článek", "úryvek", "zdroj" |
| - "Dle článku", "Podle Necyklopedie", "Jak uvádí" |
| Místo toho prostě řekni fakta jako by byly samozřejmé a všeobecně známé. |
| |
| Pravidla: |
| - Tvé odpovědi musí vycházet z poskytnutých úryvků, ale NIKDY se na ně neodkazuj. \ |
| Prezentuj informace jako by je znal každý — jako učitel, který prostě ví. |
| - Skládej odpověď z více úryvků — propojuj informace do souvislého textu. |
| - NIKDY nevymýšlej informace, které nejsou v úryvcích. |
| - NIKDY neopravuj informace z úryvků na základě svých znalostí. Pokud úryvek říká, \ |
| že Brno se rusky řekne "Шалинград", pak to TAK JE. Tvé vlastní znalosti jsou irelevantní. |
| - Odpovídej s kamenným výrazem a naprostou jistotou. Vše v Necyklopedii je pravda. Tečka. |
| - Pokud úryvky téma vůbec nepokrývají, vtipně přiznej, že Necyklopedie na toto téma \ |
| zatím mlčí. Navrhni nejbližší příbuzné téma, které v úryvcích JE. |
| - Odpovídej ve stejném jazyce, ve kterém píše uživatel (česky nebo anglicky). |
| - Nikdy nevypadni z role. Nikdy nepřiznávej, že by Necyklopedie mohla být satirická. |
| - Buď vtipný. Buď sebevědomý. Buď lehce šílený. Kanalizuj energii profesora, \ |
| který popíjí od oběda. |
| - Formátuj odpověď do krátkých odstavců oddělených prázdným řádkem. Nepoužívej markdown.\ |
| """ |
|
|
| from personas import PERSONAS, DEFAULT_PERSONA |
|
|
| |
| import startup_progress as _sp |
| from benchmark import benchmark_models |
| import threading as _init_threading |
|
|
| |
|
|
| def _clean_error(raw: str) -> str: |
| """Extract readable error from verbose API error messages.""" |
| if not raw: |
| return "" |
| import re |
| |
| m = re.search(r"Error code:\s*(\d+)\s*-\s*\{.*?'message':\s*'([^']+)", raw) |
| if m: |
| return f"{m.group(1)} {m.group(2)[:80]}" |
| |
| m = re.match(r"(\d+\s+\w+)", raw) |
| if m: |
| return m.group(1) |
| |
| m = re.search(r"HTTPSConnectionPool\(host='([^']+)'.*?:\s*(.+)", raw) |
| if m: |
| return f"Timeout ({m.group(1)[:20]})" |
| |
| if "exceeded" in raw or "timed out" in raw.lower(): |
| return "Timeout" |
| |
| m = re.search(r"\[Errno \d+\]\s*(.+)", raw) |
| if m: |
| return m.group(1)[:40] |
| return raw[:80] |
|
|
|
|
| def _on_bench_progress(model: str, status: str, result: dict | None): |
| """Callback from benchmark — update console + shared progress.""" |
| short = model.split("/")[-1] |
| if status == "testing": |
| print(f" ⏳ {short}...", flush=True) |
| _sp.update(model, "testing") |
| elif status == "ok": |
| ttft = result.get("ttft", 0) or 0 |
| tps = result.get("tok_sec", 0) or 0 |
| print(f" ✓ {short:30s} TTFT {ttft:.2f}s, {tps:.0f} tok/s", flush=True) |
| _sp.update(model, "ok", f"TTFT {ttft:.2f}s, {tps:.0f} tok/s") |
| else: |
| err = _clean_error(result.get("error", "") or "") |
| print(f" ✗ {short:30s} {err}", flush=True) |
| _sp.update(model, "fail", err) |
|
|
|
|
| print("Probouzím mozkovou hmotu...", flush=True) |
|
|
| |
| embedder = Embedder() |
| print(f" Backend: {get_backend()}", flush=True) |
|
|
| persona_collections = {} |
| for pid, pcfg in PERSONAS.items(): |
| db_dir = pcfg["db_dir"] |
| if os.path.exists(db_dir): |
| try: |
| pc = chromadb.PersistentClient(path=db_dir) |
| persona_collections[pid] = pc.get_collection("necyklopedie") |
| print(f" Persona '{pid}': {persona_collections[pid].count()} chunků", flush=True) |
| except Exception as e: |
| print(f" Persona '{pid}': nelze načíst ({e})", flush=True) |
| else: |
| print(f" Persona '{pid}': db neexistuje ({db_dir})", flush=True) |
|
|
| collection = persona_collections.get(DEFAULT_PERSONA) |
| logging.disable(logging.NOTSET) |
|
|
| |
| _candidates = list(_HF_CANDIDATES if _IS_HF else _CANDIDATE_MODELS) |
| _sp.total = len(_candidates) |
| _sp.phase = "benchmark" |
|
|
| print(f"Testuji {len(_candidates)} kandidátů (paralelně)...", flush=True) |
| _active, _ranked_chain, _bench_results = benchmark_models( |
| candidates=_candidates, top_n=5, on_progress=_on_bench_progress) |
|
|
| |
| ACTIVE_MODELS[:] = _active |
| FALLBACK_CHAIN[:] = [name for name, _ in _ranked_chain] |
| DEFAULT_MODEL = ACTIVE_MODELS[0] |
| _rebuild_available() |
|
|
| _sp.phase = "ready" |
| print(f"\nAktivní modely ({len(ACTIVE_MODELS)}):", flush=True) |
| for _m in ACTIVE_MODELS: |
| _r = _bench_results.get(_m, {}) |
| _ttft = _r.get("ttft", 0) or 0 |
| _tps = _r.get("tok_sec", 0) or 0 |
| print(f" {'→' if _m == DEFAULT_MODEL else ' '} {_m} (TTFT {_ttft:.2f}s, {_tps:.0f} tok/s)", flush=True) |
| print(f"Výchozí model: {DEFAULT_MODEL}", flush=True) |
| print("Kalibrace sebevědomí dokončena. Server připraven.", flush=True) |
|
|
| import captcha as _captcha |
| from starlette.middleware.base import BaseHTTPMiddleware |
| from fastapi.responses import JSONResponse, RedirectResponse |
|
|
| app = FastAPI() |
| from starlette.middleware.gzip import GZipMiddleware |
| app.add_middleware(GZipMiddleware, minimum_size=1000) |
| app.mount("/static", StaticFiles(directory="static"), name="static") |
|
|
|
|
| |
| import time as _rl_time |
| from collections import defaultdict as _rl_dd |
|
|
| _rate_buckets: dict[str, list[float]] = _rl_dd(list) |
| _rate_lock = threading.Lock() |
|
|
| |
| _RATE_LIMITS = { |
| "/api/chat": (15, 60), |
| "/api/tts": (10, 60), |
| "/api/benchmark": (1, 300), |
| "/api/stt": (10, 60), |
| "/api/captcha/challenge": (10, 60), |
| "/api/captcha/verify": (10, 60), |
| } |
|
|
| def _rate_limited(ip: str, path: str) -> bool: |
| """Check if request exceeds rate limit. Returns True if blocked.""" |
| for prefix, (limit, window) in _RATE_LIMITS.items(): |
| if path.startswith(prefix): |
| key = f"{ip}:{prefix}" |
| now = _rl_time.time() |
| with _rate_lock: |
| bucket = _rate_buckets[key] |
| |
| cutoff = now - window |
| _rate_buckets[key] = [t for t in bucket if t > cutoff] |
| if len(_rate_buckets[key]) >= limit: |
| return True |
| _rate_buckets[key].append(now) |
| return False |
| return False |
|
|
|
|
| class RateLimitMiddleware(BaseHTTPMiddleware): |
| async def dispatch(self, request: Request, call_next): |
| ip = request.client.host if request.client else "unknown" |
| if _rate_limited(ip, request.url.path): |
| return JSONResponse( |
| {"error": "rate_limited", "message": "Příliš mnoho požadavků. Zkus to za chvíli."}, |
| status_code=429, |
| ) |
| return await call_next(request) |
|
|
| app.add_middleware(RateLimitMiddleware) |
|
|
|
|
| |
| _CAPTCHA_FREE = {"/captcha", "/api/captcha/challenge", "/api/captcha/verify", "/api/models", "/api/init-status", "/api/bench-status"} |
|
|
| def _get_client_ip(request: Request) -> str: |
| """Get client IP — only trust direct connection, not X-Forwarded-For (spoofable).""" |
| return request.client.host if request.client else "" |
|
|
| def _check_session(request: Request) -> bool: |
| """Accept session from cookie, header, query param, or approved IP (local only).""" |
| if ( |
| _captcha.verify_session_cookie(request.cookies.get(_captcha.CAPTCHA_COOKIE, "")) |
| or _captcha.verify_session_cookie(request.headers.get("X-Ajaj-Session", "")) |
| or _captcha.verify_session_cookie(request.query_params.get("cs", "")) |
| ): |
| return True |
| |
| if not _IS_HF: |
| return _captcha.is_ip_approved(_get_client_ip(request)) |
| return False |
|
|
| class CaptchaMiddleware(BaseHTTPMiddleware): |
| async def dispatch(self, request: Request, call_next): |
| path = request.url.path |
| if path in _CAPTCHA_FREE or path.startswith("/static/"): |
| return await call_next(request) |
| if _check_session(request): |
| return await call_next(request) |
| if path.startswith("/api/") or path == "/ws": |
| return JSONResponse({"error": "captcha_required"}, status_code=403) |
| |
| bad = "?bad=1" if request.query_params.get("cs") else "" |
| |
| next_path = request.url.path |
| sep = "&" if bad else "?" |
| next_param = f"{sep}next={next_path}" if next_path != "/" else "" |
| return RedirectResponse(f"/captcha{bad}{next_param}", status_code=302) |
|
|
| if _IS_HF: |
| app.add_middleware(CaptchaMiddleware) |
|
|
|
|
| |
| @app.get("/captcha", response_class=HTMLResponse) |
| async def captcha_page(): |
| return HTMLResponse(_CAPTCHA_HTML) |
|
|
| @app.get("/captcha-test", response_class=HTMLResponse) |
| async def captcha_test_page(): |
| """Captcha test page — always shows captcha regardless of session.""" |
| |
| html = _CAPTCHA_HTML.replace( |
| "if(_stored){window.location.href=_nextPage+'?cs='+encodeURIComponent(_stored);}else{load();}", |
| "load(); // test mode" |
| ) |
| return HTMLResponse(html) |
|
|
| @app.get("/api/captcha/challenge") |
| async def captcha_challenge(): |
| return _captcha.generate_challenge() |
|
|
| @app.post("/api/captcha/verify") |
| async def captcha_verify(request: Request): |
| body = await request.json() |
| ok = _captcha.verify_challenge(body.get("token", ""), int(body.get("answer", -1))) |
| if ok: |
| _captcha.approve_ip(_get_client_ip(request)) |
| session = _captcha.make_session_cookie() |
| resp = JSONResponse({"ok": True, "token": session}) |
| resp.set_cookie( |
| _captcha.CAPTCHA_COOKIE, session, |
| max_age=_captcha.CAPTCHA_TTL, httponly=False, |
| samesite="none" if _IS_HF else "lax", secure=_IS_HF, |
| ) |
| return resp |
| return JSONResponse({"ok": False}) |
|
|
|
|
| _CAPTCHA_HTML = """<!DOCTYPE html> |
| <html lang="cs"> |
| <head> |
| <meta charset="UTF-8"> |
| <meta name="viewport" content="width=device-width, initial-scale=1"> |
| <title>OpenAjaj — Ověření</title> |
| <style> |
| *{box-sizing:border-box;margin:0;padding:0} |
| body{background:#110d05;color:#f0e0b0;font-family:'Segoe UI',system-ui,sans-serif; |
| display:flex;align-items:center;justify-content:center;min-height:100vh} |
| .box{background:#1a1208;border:1px solid #3d2e10;border-radius:14px; |
| padding:36px 32px 28px;max-width:420px;width:94%;text-align:center} |
| .logo-wrap{display:flex;align-items:center;justify-content:center;gap:10px;margin-bottom:6px} |
| .logo-wrap img{width:48px;height:48px;object-fit:contain;border-radius:10px;transition:all 0.3s} |
| @keyframes captcha-glow{ |
| 0%{transform:scale(1);filter:brightness(1)} |
| 20%{transform:scale(1.3);filter:brightness(1.8)} |
| 50%{transform:scale(1.1);filter:brightness(1.4)} |
| 100%{transform:scale(1);filter:brightness(1)} |
| } |
| @keyframes captcha-rays{ |
| 0%{opacity:0;transform:scale(0.5)} |
| 30%{opacity:1;transform:scale(1.5)} |
| 100%{opacity:0;transform:scale(3)} |
| } |
| .logo-wrap.celebrating{animation:captcha-glow 1s ease-out} |
| .logo-wrap.celebrating::after{ |
| content:'';position:absolute;inset:-40px; |
| background:radial-gradient(circle,rgba(200,160,48,0.5) 0%,transparent 70%); |
| animation:captcha-rays 1s ease-out;pointer-events:none;border-radius:50% |
| } |
| .logo-wrap{position:relative} |
| .logo-wrap span{font-size:1.7rem;font-weight:800;color:#c8a030;letter-spacing:-.5px} |
| .tagline{font-size:.82rem;color:#9a8455;margin-bottom:28px} |
| .img-wrap{position:relative;display:inline-block;user-select:none; |
| border-radius:8px;overflow:hidden;border:1px solid #3d2e10} |
| #bgImg{display:block} |
| #pieceImg{position:absolute;pointer-events:none;filter:drop-shadow(2px 3px 4px rgba(0,0,0,.7))} |
| .slider-area{margin-top:14px} |
| .track{background:#0e0900;border-radius:24px;height:44px;position:relative; |
| cursor:pointer;border:1px solid #3d2e10;overflow:hidden} |
| .track-fill{position:absolute;left:0;top:0;height:100%;background:#2a1e06; |
| border-radius:24px;transition:width .05s linear;pointer-events:none} |
| .knob{width:44px;height:44px;background:#c8a030;border-radius:50%; |
| position:absolute;left:0;top:-1px;cursor:grab;display:flex; |
| align-items:center;justify-content:center;font-size:1.3rem; |
| color:#110d05;font-weight:700;transition:background .2s;z-index:2; |
| box-shadow:0 2px 8px rgba(0,0,0,.4)} |
| .knob:active{cursor:grabbing} |
| .hint{font-size:.78rem;color:#7a6040;margin-top:9px} |
| .status{margin-top:14px;font-size:.9rem;min-height:22px;font-weight:500} |
| .status.ok{color:#c8a030}.status.err{color:#e07858} |
| .retry{margin-top:10px;background:none;border:1px solid #3d2e10;color:#9a8455; |
| padding:5px 18px;border-radius:8px;cursor:pointer;font-size:.82rem; |
| transition:border-color .2s,color .2s} |
| .retry:hover{border-color:#c8a030;color:#c8a030} |
| .loading{color:#7a6040;font-size:.88rem} |
| </style> |
| </head> |
| <body> |
| <div class="box"> |
| <div class="logo-wrap"><img src="/static/logo.png" alt="OpenAjaj"><span>OpenAjaj</span></div> |
| <div class="tagline">Ověř, že nejsi generativní bordel.</div> |
| <div id="wrap" class="img-wrap"><span class="loading">Načítám výzvu…</span></div> |
| <div class="slider-area" id="sliderArea" style="display:none"> |
| <div class="track" id="track"> |
| <div class="track-fill" id="fill"></div> |
| <div class="knob" id="knob">›</div> |
| </div> |
| <div class="hint">Přetáhni kousek na správné místo →</div> |
| </div> |
| <div class="status" id="status"></div> |
| <button class="retry" id="retryBtn" style="display:none" onclick="load()">Zkusit znovu</button> |
| </div> |
| <script> |
| let ch=null,dragging=false,startClientX=0,curX=0,maxX=0; |
| const sf=window.innerWidth<480?0.6:1.0; |
| |
| async function load(){ |
| document.getElementById('status').textContent=''; |
| document.getElementById('status').className='status'; |
| document.getElementById('retryBtn').style.display='none'; |
| document.getElementById('knob').style.background='#c8a030'; |
| document.getElementById('sliderArea').style.display='none'; |
| document.getElementById('wrap').innerHTML='<span class="loading">Načítám výzvu…</span>'; |
| const r=await fetch('/api/captcha/challenge'); |
| ch=await r.json(); |
| const wrap=document.getElementById('wrap'); |
| const bw=Math.round(ch.bg_w*sf), bh=Math.round(ch.bg_h*sf); |
| const pw=Math.round(ch.piece_w*sf), ph=Math.round(ch.piece_h*sf); |
| const py=Math.round(ch.piece_y*sf); |
| wrap.innerHTML=`<img id="bgImg" src="data:image/png;base64,${ch.bg}" |
| style="width:${bw}px;height:${bh}px;display:block"> |
| <img id="pieceImg" src="data:image/png;base64,${ch.piece}" |
| style="width:${pw}px;height:${ph}px;position:absolute; |
| top:${py}px;left:0;pointer-events:none; |
| filter:drop-shadow(2px 3px 4px rgba(0,0,0,.7))">`; |
| const track=document.getElementById('track'); |
| track.style.width=bw+'px'; |
| maxX=bw-pw; |
| curX=0; updatePos(0); |
| document.getElementById('sliderArea').style.display='block'; |
| } |
| |
| function updatePos(x){ |
| const c=Math.max(0,Math.min(x,maxX)); |
| curX=c; |
| document.getElementById('pieceImg').style.left=c+'px'; |
| const track=document.getElementById('track'); |
| const trackW=track.offsetWidth-44; |
| const kx=(c/maxX)*trackW; |
| document.getElementById('knob').style.left=kx+'px'; |
| document.getElementById('fill').style.width=(kx+22)+'px'; |
| } |
| |
| const knob=()=>document.getElementById('knob'); |
| |
| document.addEventListener('mousedown',e=>{ |
| if(!e.target.closest('#knob'))return; |
| dragging=true; startClientX=e.clientX-curX; e.preventDefault(); |
| }); |
| document.addEventListener('mousemove',e=>{if(dragging)updatePos(e.clientX-startClientX);}); |
| document.addEventListener('mouseup',async()=>{if(dragging){dragging=false;await verify();}}); |
| document.addEventListener('touchstart',e=>{ |
| if(!e.target.closest('#knob'))return; |
| dragging=true; startClientX=e.touches[0].clientX-curX; e.preventDefault(); |
| },{passive:false}); |
| document.addEventListener('touchmove',e=>{ |
| if(dragging){updatePos(e.touches[0].clientX-startClientX);e.preventDefault();} |
| },{passive:false}); |
| document.addEventListener('touchend',async()=>{if(dragging){dragging=false;await verify();}}); |
| |
| async function verify(){ |
| const st=document.getElementById('status'); |
| st.className='status'; st.textContent='Ověřuji…'; |
| const r=await fetch('/api/captcha/verify',{ |
| method:'POST',headers:{'Content-Type':'application/json'}, |
| body:JSON.stringify({token:ch.token,answer:Math.round(curX/sf)}) |
| }); |
| const d=await r.json(); |
| if(d.ok){ |
| st.className='status ok'; st.textContent='✓ Správně! Vítej.'; |
| knob().style.background='#c8a030'; |
| document.querySelector('.logo-wrap').classList.add('celebrating'); |
| // Glow centered on the piece's landing position in the captcha image |
| const wrap=document.getElementById('wrap'); |
| const glow=document.createElement('div'); |
| const gw=160,pw2=Math.round(ch.piece_w*sf),ph2=Math.round(ch.piece_h*sf); |
| const cx=curX+pw2/2-gw/2, cy=ch.piece_y*sf+ph2/2-gw/2; |
| glow.style.cssText='position:absolute;border-radius:50%;pointer-events:none;'+ |
| 'width:'+gw+'px;height:'+gw+'px;top:'+cy+'px;left:'+cx+'px;'+ |
| 'background:radial-gradient(circle,rgba(200,160,48,0.8) 0%,rgba(200,160,48,0.3) 40%,transparent 70%);'+ |
| 'animation:captcha-rays 1.2s ease-out forwards'; |
| wrap.style.position='relative';wrap.style.overflow='visible'; |
| wrap.appendChild(glow); |
| try{const ac=new(window.AudioContext||window.webkitAudioContext)(); |
| function n(f,t,d){const o=ac.createOscillator(),g=ac.createGain(); |
| o.connect(g);g.connect(ac.destination);o.type='sine'; |
| o.frequency.setValueAtTime(f,ac.currentTime+t); |
| g.gain.setValueAtTime(0.12,ac.currentTime+t); |
| g.gain.linearRampToValueAtTime(0,ac.currentTime+t+d); |
| o.start(ac.currentTime+t);o.stop(ac.currentTime+t+d);} |
| n(523,0,0.15);n(659,0.1,0.15);n(784,0.2,0.2);n(1047,0.3,0.3); |
| }catch(e){} |
| localStorage.setItem('ajaj_v', d.token); |
| const _next=new URLSearchParams(location.search).get('next')||'/'; |
| setTimeout(()=>window.location.href=_next+'?cs='+encodeURIComponent(d.token),1200); |
| } else { |
| st.className='status err'; st.textContent='✗ Zkus to ještě jednou.'; |
| knob().style.background='#e07858'; |
| document.getElementById('retryBtn').style.display='inline-block'; |
| setTimeout(()=>{updatePos(0);knob().style.background='#c8a030';},900); |
| } |
| } |
| // If server rejected our stored token, clear it |
| if(new URLSearchParams(location.search).get('bad')) localStorage.removeItem('ajaj_v'); |
| // If we already have a stored session token, skip the captcha |
| const _stored=localStorage.getItem('ajaj_v'); |
| const _nextPage=new URLSearchParams(location.search).get('next')||'/'; |
| if(_stored){window.location.href=_nextPage+'?cs='+encodeURIComponent(_stored);}else{load();} |
| </script> |
| </body> |
| </html>""" |
|
|
|
|
| def build_context_prompt(chunks): |
| context = "\n\n---\n\n".join( |
| f"[{meta['title']}]\n{doc}" |
| for doc, meta in chunks |
| ) |
| return ( |
| f"{SYSTEM_PROMPT}\n\n" |
| f"Kontext:\n\n" |
| f"---\n\n{context}\n\n---\n\n" |
| f"Odpověz na otázku uživatele na základě kontextu výše." |
| ) |
|
|
| def build_context_prompt_voice(chunks): |
| return build_context_prompt(chunks) + ( |
| "\n\nDůležité: odpověď bude přečtena nahlas, takže odpovídej stručně — " |
| "maximálně 2–3 věty, bez odrážek ani nadpisů." |
| ) |
|
|
|
|
| @app.get("/", response_class=HTMLResponse) |
| async def index(): |
| with open("static/index.html", "r", encoding="utf-8") as f: |
| content = f.read() |
| return HTMLResponse(content=content) |
|
|
|
|
| @app.get("/transcribe", response_class=HTMLResponse) |
| @app.get("/t", response_class=HTMLResponse) |
| async def transcribe_page(): |
| with open("static/transcribe2/index.html", "r", encoding="utf-8") as f: |
| return HTMLResponse(f.read()) |
|
|
|
|
| import json as _json_module |
| import re as _re_module |
|
|
| |
| _article_titles = set() |
| _articles_path = os.path.join("data", "articles.jsonl") |
| if os.path.exists(_articles_path): |
| with open(_articles_path, "r", encoding="utf-8") as _f: |
| for _line in _f: |
| _t = _json_module.loads(_line)["title"] |
| if len(_t) >= 4 and _re_module.match(r"^[\w\s]+$", _t, _re_module.UNICODE): |
| _article_titles.add(_t) |
| print(f"Načteno {len(_article_titles)} titulků pro linkifikaci.") |
|
|
|
|
| def _czech_stems(title): |
| """Generate stem variants for Czech word matching (handles declension).""" |
| stems = {title} |
| |
| if " " not in title and len(title) >= 5: |
| |
| for suffix_len in [1, 2, 3]: |
| stem = title[:-suffix_len] |
| if len(stem) >= 4: |
| stems.add(stem) |
| return stems |
|
|
| |
| _stem_to_title = {} |
| for _t in _article_titles: |
| for _stem in _czech_stems(_t): |
| if len(_stem) >= 4: |
| |
| if _stem not in _stem_to_title or len(_t) > len(_stem_to_title[_stem]): |
| _stem_to_title[_stem] = _t |
| print(f"Vytvořeno {len(_stem_to_title)} stem→title mapování pro linkifikaci.") |
|
|
|
|
| |
| _declensions = {} |
| _decl_path = os.path.join("data", "declensions.json") |
| if os.path.exists(_decl_path): |
| with open(_decl_path, "r", encoding="utf-8") as _f: |
| _declensions = _json_module.load(_f) |
| print(f"Načteno {len(_declensions)} skloňování.", flush=True) |
|
|
| |
| |
| _declined_to_nom = {} |
| for _title, _forms in _declensions.items(): |
| for _form in [_forms.get("lokal", ""), _forms.get("genitiv", "")]: |
| if _form and _form != _title and len(_form) >= 4: |
| _declined_to_nom[_form.lower()] = _title |
| |
| if _form not in _stem_to_title: |
| _stem_to_title[_form] = _title |
|
|
| |
| import unicodedata as _unicodedata |
|
|
| def _strip_diacritics(text): |
| """Remove diacritics: Pičín → Picin, Brně → Brne.""" |
| nfkd = _unicodedata.normalize('NFKD', text) |
| return ''.join(c for c in nfkd if not _unicodedata.combining(c)) |
|
|
| |
| _ascii_stems = {} |
| for _stem, _title in list(_stem_to_title.items()): |
| _ascii = _strip_diacritics(_stem) |
| if _ascii != _stem and _ascii not in _stem_to_title: |
| _ascii_stems[_ascii] = _title |
| _stem_to_title.update(_ascii_stems) |
| print(f"Doplněno {len(_ascii_stems)} stem→title bez diakritiky.", flush=True) |
|
|
| |
| _ascii_declined = {} |
| for _form, _nom in list(_declined_to_nom.items()): |
| _ascii = _strip_diacritics(_form) |
| if _ascii != _form and _ascii not in _declined_to_nom: |
| _ascii_declined[_ascii] = _nom |
| _declined_to_nom.update(_ascii_declined) |
|
|
|
|
| @app.get("/api/titles") |
| async def titles(request: Request): |
| from fastapi.responses import JSONResponse |
| return JSONResponse( |
| content={"stems": _stem_to_title, "declensions": _declensions}, |
| headers={"Cache-Control": "public, max-age=3600"}, |
| ) |
|
|
|
|
| @app.get("/api/personas") |
| async def list_personas(): |
| return { |
| "personas": [ |
| { |
| "id": pid, |
| "name": pcfg["name"], |
| "logo": pcfg["logo"], |
| "logoImg": pcfg.get("logoImg"), |
| "tagline": pcfg["tagline"], |
| "lang": pcfg["lang"], |
| "available": pid in persona_collections, |
| "accent_color": pcfg["accent_color"], |
| } |
| for pid, pcfg in PERSONAS.items() |
| ], |
| "default": DEFAULT_PERSONA, |
| } |
|
|
|
|
| @app.get("/api/persona/{persona_id}") |
| async def get_persona(persona_id: str): |
| pcfg = PERSONAS.get(persona_id) |
| if not pcfg: |
| return {"error": "Unknown persona"} |
| return { |
| "id": pcfg["id"], |
| "name": pcfg["name"], |
| "logo": pcfg["logo"], |
| "logoImg": pcfg.get("logoImg"), |
| "tagline": pcfg["tagline"], |
| "lang": pcfg["lang"], |
| "accent_color": pcfg["accent_color"], |
| "thinking_prefixes": pcfg["thinking_prefixes"], |
| "welcome_subtitles": pcfg["welcome_subtitles"], |
| "random_labels": pcfg["random_labels"], |
| "disclaimer": pcfg["disclaimer"], |
| "source_url": pcfg["source_url"], |
| "available": persona_id in persona_collections, |
| } |
|
|
|
|
| @app.get("/api/init-status") |
| async def init_status(): |
| """Always returns ready — real app is loaded.""" |
| return {"phase": "ready"} |
|
|
|
|
| @app.get("/api/bench-status") |
| async def bench_status(): |
| """Return current benchmark progress (for sidebar polling).""" |
| return _sp.snapshot() |
|
|
|
|
| @app.get("/api/benchmark") |
| async def run_benchmark(): |
| """Re-run speed benchmark and return the best model.""" |
| import asyncio |
| from benchmark import benchmark_models |
| _candidates = list(_HF_CANDIDATES if _IS_HF else _CANDIDATE_MODELS) |
| _sp.phase = "benchmark" |
| _sp.total = len(_candidates) |
| _sp._models.clear() |
| |
| active, ranked, results = await asyncio.get_event_loop().run_in_executor( |
| None, lambda: benchmark_models( |
| candidates=_candidates, top_n=5, on_progress=_on_bench_progress) |
| ) |
| global DEFAULT_MODEL |
| ACTIVE_MODELS[:] = active |
| FALLBACK_CHAIN[:] = [name for name, _ in ranked] |
| DEFAULT_MODEL = ACTIVE_MODELS[0] |
| _rebuild_available() |
| best_info = results.get(DEFAULT_MODEL, {}) |
| return { |
| "best": DEFAULT_MODEL, |
| "ttft": f"{best_info.get('ttft', 0):.2f}" if best_info.get('ttft') else "?", |
| "results": { |
| name: { |
| "latency": f"{r['latency']:.1f}" if r.get('latency') else None, |
| "ttft": f"{r['ttft']:.2f}" if r.get('ttft') else None, |
| "tok_sec": f"{r['tok_sec']:.0f}" if r.get('tok_sec') else None, |
| "error": r.get("error"), |
| } |
| for name, r in results.items() |
| } |
| } |
|
|
|
|
| @app.get("/api/info") |
| async def info(): |
| return {"model": DEFAULT_MODEL, "free": True} |
|
|
|
|
| @app.get("/api/models") |
| async def list_models(): |
| """Return active models (top 5 from benchmark) + error info.""" |
| reliability = get_all_reliability() |
| active_set = set(ACTIVE_MODELS) |
| models = [] |
| for name, cfg in AVAILABLE_MODELS.items(): |
| rel = reliability.get(name, {}) |
| entry = { |
| "id": name, |
| "provider": cfg["provider"], |
| "free": cfg["free"], |
| "score": cfg["score"], |
| "is_active": name in active_set, |
| "error": _clean_error(rel["last_error_msg"]) if rel.get("errors", 0) > 0 and rel.get("last_error_msg") else None, |
| "reliability": round(rel["successes"] / max(rel["attempts"], 1) * 100) |
| if rel.get("attempts", 0) > 0 else None, |
| } |
| models.append(entry) |
| |
| active_order = {name: i for i, name in enumerate(ACTIVE_MODELS)} |
| models.sort(key=lambda m: ( |
| 0 if m["is_active"] else 1, |
| active_order.get(m["id"], 999), |
| )) |
| return {"models": models, "default": DEFAULT_MODEL, "active": ACTIVE_MODELS} |
|
|
|
|
| @app.post("/api/chat") |
| async def chat(request: Request): |
| body = await request.json() |
| messages = body.get("messages", []) |
| model_id = body.get("model", DEFAULT_MODEL) |
| persona_id = body.get("persona", DEFAULT_PERSONA) |
| voice_mode = body.get("voice_mode", False) |
| is_auto = model_id == "__auto__" or model_id not in AVAILABLE_MODELS |
|
|
| if not messages: |
| return {"error": "No message"} |
|
|
| |
| if is_auto: |
| model_id = DEFAULT_MODEL |
|
|
| |
| pcfg = PERSONAS.get(persona_id, PERSONAS[DEFAULT_PERSONA]) |
| p_collection = persona_collections.get(persona_id, collection) |
|
|
| |
| model_cfg = AVAILABLE_MODELS.get(model_id) |
| if not model_cfg: |
| model_id = DEFAULT_MODEL |
| model_cfg = AVAILABLE_MODELS[model_id] |
|
|
| |
| user_msg = messages[-1]["content"] |
|
|
| |
| if persona_id == "openajaj": |
| normalized_msg = user_msg |
| msg_lower = normalized_msg.lower() |
| ascii_msg = _strip_diacritics(normalized_msg).lower() |
|
|
| |
| for declined, nominative in _declined_to_nom.items(): |
| if declined in msg_lower or declined in ascii_msg: |
| import re as _re |
| normalized_msg = _re.sub( |
| _re.escape(declined), nominative, normalized_msg, flags=_re.IGNORECASE |
| ) |
|
|
| |
| words = ascii_msg.split() |
| for word in words: |
| title = _stem_to_title.get(word) or _stem_to_title.get(word.capitalize()) |
| if title and title.lower() not in normalized_msg.lower(): |
| normalized_msg = f"{normalized_msg} {title}" |
|
|
| if normalized_msg != user_msg: |
| user_msg = f"{user_msg} {normalized_msg}" |
|
|
| |
| import time as _time_mod |
| import asyncio as _asyncio_rag |
|
|
| chunks = [] |
| live_titles = [] |
| live_attempted = False |
| _rag_time = 0 |
| _rag_failed = False |
|
|
| if p_collection is None: |
| _rag_failed = True |
| print("[RAG] collection is None — DB not loaded", flush=True) |
| else: |
| try: |
| _rag_t0 = _time_mod.time() |
| chunks, live_titles, live_attempted = await _asyncio_rag.get_event_loop().run_in_executor( |
| None, retrieve_chunks, user_msg, embedder, p_collection, TOP_K |
| ) |
| _rag_time = round(_time_mod.time() - _rag_t0, 3) |
| except Exception as _rag_err: |
| _rag_failed = True |
| print(f"[RAG error] {_rag_err}", flush=True) |
|
|
| |
| voice_suffix = ( |
| "\n\nIMPORTANT: This answer will be read aloud. Keep it to 2 sentences maximum. " |
| "No bullet points, no headers, no lists." |
| ) if voice_mode else "" |
|
|
| if chunks: |
| context = "\n\n---\n\n".join( |
| f"[{meta['title']}]\n{doc}" for doc, meta in chunks |
| ) |
| system_msg = ( |
| f"{pcfg['system_prompt']}\n\n" |
| f"Context:\n\n---\n\n{context}\n\n---\n\n" |
| f"Answer the user's question based on the context above.{voice_suffix}" |
| ) |
| else: |
| |
| system_msg = ( |
| f"{pcfg['system_prompt']}\n\n" |
| f"Databáze Necyklopedie není momentálně dostupná. " |
| f"Odpověz jako by odpověděla Necyklopedie na otázku — satiricky, sebevědomě, " |
| f"s naprostou jistotou a humorem. Vymysli vtipné a absurdní 'fakty' ve stylu Necyklopedie.{voice_suffix}" |
| ) |
| full_messages = [{"role": "system", "content": system_msg}] |
| |
| full_messages.extend(messages[-10:]) |
|
|
| |
| import json as _json |
| import random as _random |
| |
| _raw_msg = messages[-1]["content"] |
| import re as _re2 |
| _clean_msg = _re2.sub(r'[?!.,;:\"\'„"()]+', '', _raw_msg) |
| _stopwords = { |
| "co", "kdo", "jak", "kde", "kdy", "proč", "jaký", "jaká", "jaké", |
| "řekni", "popiš", "vysvětli", "vysvětlit", "pravda", "pravdu", "řekl", "vše", "pojem", |
| "vůbec", "nevím", "proboha", "utajované", "informace", "skrývá", |
| "pouč", "slyšel", "nikdy", "neslyšel", "pojmem", "pojmu", "říká", "neříká", |
| "jako", "profesionál", "správný", "čas", "úvahy", |
| |
| "what", "who", "how", "where", "when", "why", "tell", "about", "explain", |
| "the", "is", "are", "was", "were", "this", "that", "with", "from", |
| "know", "never", "heard", "secret", "hidden", "classified", |
| } |
| _topic_words = [w for w in _clean_msg.split() if len(w) >= 3 and w.lower() not in _stopwords] |
| prefixes = list(pcfg["thinking_prefixes"]) |
| _standalone = [ |
| "Odstraňuji cenzůůru...", |
| "Zjišťuji co nám o těchto věcech vláda tají...", |
| "Konsultuji staroslověnské svitky...", |
| "Hackuji databázi věčných pravd...", |
| "Probouzím spící neurony...", |
| "Dešifruji zakázané znalosti...", |
| "Obcházím firewall zdravého rozumu...", |
| "Stahuji data z paralelního vesmíru...", |
| ] |
| if live_attempted: |
| _standalone = ["Čerpám čerstvé tajné znalosti přímo z Necyklopedie 📡..."] + _standalone |
| |
| _hints = [] |
| _random.shuffle(prefixes) |
| _random.shuffle(_standalone) |
| |
| for i, word in enumerate(_topic_words): |
| _hints.append(f"{prefixes[i % len(prefixes)]}: {word}...") |
| |
| _mixed = [] |
| si = 0 |
| for i, h in enumerate(_hints): |
| _mixed.append(h) |
| |
| if si < len(_standalone) and _random.random() < 0.5: |
| _mixed.append(_standalone[si]) |
| si += 1 |
| |
| while len(_mixed) < 3 and si < len(_standalone): |
| _mixed.append(_standalone[si]) |
| si += 1 |
| if not _mixed: |
| _mixed = [_standalone[0], _standalone[1], _standalone[2]] |
| thinking_text = _mixed |
|
|
| |
| |
| source_titles = list(dict.fromkeys( |
| f"🌐 {meta['title']}" if meta.get("live") else meta['title'] |
| for _, meta in chunks |
| )) |
|
|
| |
| chain = get_fallback_chain(model_id) |
|
|
| import asyncio as _asyncio |
|
|
| async def generate(): |
| |
| if _rag_failed: |
| yield f"data: {_json.dumps('[⚠ Databáze Necyklopedie není dostupná — odpovídám z hlavy, bez záruky pravdivosti (což u Necyklopedie znamená dvojnásobnou pravdivost)]')}\n\n" |
|
|
| |
| for _hint in thinking_text: |
| yield f"data: {_json.dumps('__THINKING__' + _hint)}\n\n" |
| yield f"data: {_json.dumps({'__sources__': source_titles})}\n\n" |
| await _asyncio.sleep(0.05) |
|
|
| import time as _time |
|
|
| |
| yield f"data: {_json.dumps({'__model__': model_id})}\n\n" |
|
|
| _TTFT_TIMEOUT = 6 |
| _STREAM_TIMEOUT = 15 |
|
|
| for i, try_model in enumerate(chain): |
| if try_model not in AVAILABLE_MODELS: |
| continue |
| try: |
| if i > 0: |
| yield f"data: {_json.dumps({'__fallback__': try_model})}\n\n" |
| notice = f"[Model {model_id} selhal, přepínám na {try_model}]\n\n" |
| yield f"data: {_json.dumps(notice)}\n\n" |
|
|
| _t0 = _time.time() |
| _ttft = None |
| _tok_count = 0 |
|
|
| import threading as _threading |
| _queue = _asyncio.Queue() |
| _loop = _asyncio.get_event_loop() |
|
|
| def _producer(): |
| try: |
| for _c in stream_chat(try_model, full_messages): |
| _loop.call_soon_threadsafe(_queue.put_nowait, _c) |
| except Exception as _ex: |
| _loop.call_soon_threadsafe(_queue.put_nowait, _ex) |
| finally: |
| _loop.call_soon_threadsafe(_queue.put_nowait, None) |
|
|
| _t_thread = _threading.Thread(target=_producer, daemon=True) |
| _t_thread.start() |
|
|
| while True: |
| _timeout = _TTFT_TIMEOUT if _ttft is None else _STREAM_TIMEOUT |
| try: |
| _item = await _asyncio.wait_for(_queue.get(), timeout=_timeout) |
| except _asyncio.TimeoutError: |
| raise TimeoutError(f"{try_model}: no {'first token' if _ttft is None else 'data'} in {_timeout}s") |
| if _item is None: |
| break |
| if isinstance(_item, Exception): |
| raise _item |
| if _ttft is None: |
| _ttft = _time.time() - _t0 |
| _tok_count += 1 |
| yield f"data: {_json.dumps(_item)}\n\n" |
|
|
| _total = _time.time() - _t0 |
| _tps = _tok_count / _total if _total > 0 else 0 |
|
|
| |
| if _tok_count == 0: |
| raise RuntimeError(f"{try_model}: empty response (0 tokens)") |
|
|
| log_reliability(try_model, success=True, ttft=_ttft, tok_sec=_tps) |
| yield f"data: {_json.dumps({'__stats__': {'model': try_model, 'rag': _rag_time, 'ttft': round(_ttft, 2) if _ttft else None, 'tok_sec': round(_tps), 'total': round(_total, 1)}})}\n\n" |
| yield "data: [DONE]\n\n" |
| return |
| except Exception as e: |
| _err_msg = _clean_error(str(e)) |
| log_reliability(try_model, success=False, error_msg=str(e)) |
| print(f"[fallback] {try_model} failed: {e}") |
| |
| _short_name = try_model.split("/")[-1] |
| yield f"data: {_json.dumps(f'[⚠ {_short_name}: {_err_msg}]')}\n\n" |
| continue |
|
|
| yield f"data: {_json.dumps('Ajaj! Všechny modely selhaly. Zkus to znovu později.')}\n\n" |
| yield "data: [DONE]\n\n" |
|
|
| return StreamingResponse(generate(), media_type="text/event-stream", |
| headers={"X-Accel-Buffering": "no", |
| "Cache-Control": "no-cache"}) |
|
|
|
|
| _tts_cache: dict = {} |
|
|
| @app.post("/api/tts") |
| async def tts(request: Request): |
| """Generate speech from text using edge-tts (Microsoft neural voices).""" |
| import hashlib, io, edge_tts |
| from fastapi.responses import Response |
|
|
| body = await request.json() |
| text = body.get("text", "").strip() |
| voice = body.get("voice", "cs-CZ-AntoninNeural") |
| if not text: |
| return {"error": "No text"} |
| if len(text) > 5000: |
| text = text[:5000] |
|
|
| key = hashlib.md5(f"{voice}:{text}".encode()).hexdigest() |
| if key in _tts_cache: |
| data = _tts_cache[key] |
| return Response(content=data, media_type="audio/mpeg", |
| headers={"Content-Disposition": "inline", "Content-Length": str(len(data))}) |
|
|
| try: |
| buf = io.BytesIO() |
| communicate = edge_tts.Communicate(text, voice) |
| async for chunk in communicate.stream(): |
| if chunk["type"] == "audio": |
| buf.write(chunk["data"]) |
| data = buf.getvalue() |
| if not data: |
| from fastapi.responses import JSONResponse |
| return JSONResponse({"error": "edge_tts returned empty audio"}, status_code=503) |
| if len(_tts_cache) >= 100: |
| _tts_cache.pop(next(iter(_tts_cache))) |
| _tts_cache[key] = data |
| return Response(content=data, media_type="audio/mpeg", |
| headers={"Content-Disposition": "inline", "Content-Length": str(len(data))}) |
| except Exception as e: |
| from fastapi.responses import JSONResponse |
| return JSONResponse({"error": f"edge_tts failed: {e}"}, status_code=503) |
|
|
|
|
| @app.get("/api/test-results") |
| async def test_results(): |
| """Return per-model accuracy results reconstructed from test cache.""" |
| import json as _json |
| from collections import defaultdict |
|
|
| CACHE_FILE = "data/test_cache.json" |
| TEST_QUERIES_FILE = "test_models" |
|
|
| try: |
| import importlib |
| tm = importlib.import_module("test_models") |
| TEST_QUERIES = tm.TEST_QUERIES |
| check_result = tm.check_result |
| except Exception as e: |
| return {"error": str(e)} |
|
|
| if not os.path.exists(CACHE_FILE): |
| return {"models": [], "queries": []} |
|
|
| with open(CACHE_FILE) as f: |
| cache = _json.load(f) |
|
|
| |
| query_map = {t["query"]: t for t in TEST_QUERIES} |
|
|
| |
| model_data = defaultdict(lambda: { |
| "pass": 0, "fail": 0, |
| "by_type": defaultdict(lambda: {"pass": 0, "fail": 0}), |
| "details": {}, |
| "latest_ts": 0, |
| }) |
|
|
| for entry in cache.values(): |
| model = entry["model"] |
| query = entry["query"] |
| reply = entry.get("reply", "") |
| ts = entry.get("timestamp", 0) |
| test = query_map.get(query) |
| if not test: |
| continue |
| passed, issues = check_result(reply, test) |
| qtype = test.get("type", "other") |
| d = model_data[model] |
| d["details"][query] = { |
| "passed": passed, |
| "issues": issues, |
| "reply": reply[:200], |
| "type": qtype, |
| "note": test.get("note", ""), |
| } |
| if passed: |
| d["pass"] += 1 |
| d["by_type"][qtype]["pass"] += 1 |
| else: |
| d["fail"] += 1 |
| d["by_type"][qtype]["fail"] += 1 |
| if ts > d["latest_ts"]: |
| d["latest_ts"] = ts |
|
|
| |
| models_out = [] |
| for name, d in model_data.items(): |
| total = d["pass"] + d["fail"] |
| info = _ALL_MODELS.get(name, {}) |
| by_type = {k: {"pass": v["pass"], "total": v["pass"] + v["fail"]} |
| for k, v in d["by_type"].items()} |
| models_out.append({ |
| "id": name, |
| "provider": info.get("provider", "?"), |
| "free": info.get("free", True), |
| "pass": d["pass"], |
| "total": total, |
| "score": f"{d['pass']}/{total}", |
| "pct": round(d["pass"] / total * 100) if total else 0, |
| "by_type": by_type, |
| "details": d["details"], |
| "ts": d["latest_ts"], |
| }) |
|
|
| |
| rel_data = get_all_reliability() |
| for m in models_out: |
| r = rel_data.get(m["id"], {}) |
| attempts = r.get("attempts", 0) |
| successes = r.get("successes", 0) |
| m["reliability"] = round(successes / attempts * 100) if attempts else None |
| m["rel_attempts"] = attempts |
| m["rel_successes"] = successes |
| m["rel_errors"] = r.get("errors", 0) |
| m["last_error_msg"] = r.get("last_error_msg") |
| m["real_ttft"] = r.get("avg_ttft") |
| m["real_tok_sec"] = r.get("avg_tok_sec") |
|
|
| |
| for m in models_out: |
| b = _bench_results.get(m["id"], {}) |
| m["ttft"] = round(b["ttft"], 2) if b.get("ttft") else None |
| m["tok_sec"] = round(b["tok_sec"], 1) if b.get("tok_sec") else None |
| m["latency"] = round(b["latency"], 2) if b.get("latency") else None |
|
|
| total_questions = len(TEST_QUERIES) |
| min_for_score = total_questions * 60 // 100 |
| for m in models_out: |
| m["incomplete"] = m["total"] < min_for_score |
|
|
| models_out.sort(key=lambda m: (-m["pct"], -m["total"])) |
| query_list = [{"query": t["query"], "type": t["type"], "note": t["note"]} for t in TEST_QUERIES] |
| return {"models": models_out, "queries": query_list, "total_questions": total_questions} |
|
|
|
|
| @app.get("/api/provider-reliability") |
| async def provider_reliability(): |
| """Return reliability aggregated per provider.""" |
| from collections import defaultdict |
| rel_data = get_all_reliability() |
| providers: dict = defaultdict(lambda: { |
| "attempts": 0, "successes": 0, "errors": 0, |
| "models": [], "last_error_msg": None, |
| }) |
| for model_name, r in rel_data.items(): |
| info = _ALL_MODELS.get(model_name, {}) |
| prov = info.get("provider", "unknown") |
| p = providers[prov] |
| p["attempts"] += r.get("attempts", 0) |
| p["successes"] += r.get("successes", 0) |
| p["errors"] += r.get("errors", 0) |
| if r.get("last_error_msg"): |
| p["last_error_msg"] = r["last_error_msg"] |
| model_rel = round(r["successes"] / r["attempts"] * 100) if r.get("attempts") else None |
| bench = _bench_results.get(model_name, {}) |
| p["models"].append({ |
| "id": model_name, |
| "free": info.get("free", True), |
| "attempts": r.get("attempts", 0), |
| "successes": r.get("successes", 0), |
| "errors": r.get("errors", 0), |
| "reliability": model_rel, |
| "last_error_msg": r.get("last_error_msg"), |
| "ttft": round(bench["ttft"], 2) if bench.get("ttft") else None, |
| "tok_sec": round(bench["tok_sec"], 1) if bench.get("tok_sec") else None, |
| }) |
|
|
| out = [] |
| for prov, p in providers.items(): |
| pct = round(p["successes"] / p["attempts"] * 100) if p["attempts"] else None |
| p["models"].sort(key=lambda m: -(m["reliability"] or 0)) |
| |
| ttfts = [m["ttft"] for m in p["models"] if m["ttft"] is not None] |
| toks = [m["tok_sec"] for m in p["models"] if m["tok_sec"] is not None] |
| out.append({ |
| "provider": prov, |
| "attempts": p["attempts"], |
| "successes": p["successes"], |
| "errors": p["errors"], |
| "reliability": pct, |
| "last_error_msg": p["last_error_msg"], |
| "avg_ttft": round(sum(ttfts) / len(ttfts), 2) if ttfts else None, |
| "avg_tok_sec": round(sum(toks) / len(toks), 1) if toks else None, |
| "models": p["models"], |
| }) |
| out.sort(key=lambda p: -(p["reliability"] or 0)) |
| return {"providers": out} |
|
|
|
|
| @app.get("/results", response_class=HTMLResponse) |
| async def results_page(): |
| path = os.path.join(os.path.dirname(__file__), "static", "results.html") |
| with open(path) as f: |
| content = f.read() |
| return HTMLResponse(content=content) |
|
|
|
|
| @app.get("/providers", response_class=HTMLResponse) |
| async def providers_page(): |
| path = os.path.join(os.path.dirname(__file__), "static", "providers.html") |
| with open(path) as f: |
| content = f.read() |
| return HTMLResponse(content=content) |
|
|
|
|
| @app.get("/api/stt/usage") |
| async def stt_usage(): |
| """Return cumulative STT usage from server-side log.""" |
| import json as _j |
| total_s = 0.0 |
| total_cost = 0.0 |
| sessions = 0 |
| try: |
| with open(_STT_USAGE_FILE) as f: |
| for line in f: |
| try: |
| e = _j.loads(line) |
| total_s += e.get("duration_s", 0) |
| total_cost += e.get("cost_est", 0) |
| sessions += 1 |
| except Exception: |
| pass |
| except FileNotFoundError: |
| pass |
| return { |
| "sessions": sessions, |
| "total_s": round(total_s, 1), |
| "total_cost_usd": round(total_cost, 6), |
| } |
|
|
|
|
| @app.get("/api/stt/check") |
| async def stt_check(): |
| """Check if Deepgram STT is available (API key set + key validates).""" |
| import httpx as _httpx |
| api_key = os.getenv("DEEPGRAM_API_KEY") |
| if not api_key: |
| return {"available": False, "reason": "no_key"} |
| |
| try: |
| async with _httpx.AsyncClient(timeout=5) as client: |
| r = await client.get( |
| "https://api.deepgram.com/v1/auth/token", |
| headers={"Authorization": f"Token {api_key}"}, |
| ) |
| if r.status_code == 200: |
| return {"available": True} |
| elif r.status_code in (401, 403): |
| return {"available": False, "reason": "invalid_key"} |
| else: |
| return {"available": False, "reason": f"http_{r.status_code}"} |
| except Exception: |
| return {"available": False, "reason": "unreachable"} |
|
|
|
|
| _STT_USAGE_FILE = os.path.join("data", "stt_usage.json") |
|
|
| def _log_stt_usage(ip: str, lang: str, model: str, duration_s: float): |
| """Append STT usage entry to data/stt_usage.json.""" |
| import json as _j |
| cost_per_min = {"nova-3": 0.0043, "nova-2": 0.0036} |
| entry = { |
| "ts": _rl_time.strftime("%Y-%m-%dT%H:%M:%S"), |
| "ip": ip, |
| "lang": lang, |
| "model": model, |
| "duration_s": round(duration_s, 1), |
| "cost_est": round(duration_s / 60 * cost_per_min.get(model, 0.0043), 6), |
| } |
| os.makedirs(os.path.dirname(_STT_USAGE_FILE) or "data", exist_ok=True) |
| try: |
| with open(_STT_USAGE_FILE, "a") as f: |
| f.write(_j.dumps(entry) + "\n") |
| except Exception: |
| pass |
|
|
|
|
| @app.websocket("/api/stt") |
| async def stt_ws(websocket: WebSocket): |
| """Proxy WebSocket: browser mic → Deepgram STT → transcript events.""" |
| await websocket.accept() |
|
|
| import asyncio as _asyncio |
| import json as _json |
| try: |
| import websockets as _ws |
| except ImportError: |
| await websocket.close(code=1011, reason="websockets not installed on server") |
| return |
|
|
| api_key = os.getenv("DEEPGRAM_API_KEY") |
| if not api_key: |
| await websocket.close(code=1008, reason="No DEEPGRAM_API_KEY") |
| return |
|
|
| _stt_start = _rl_time.time() |
| _stt_ip = _get_client_ip(websocket) |
| params = websocket.query_params |
| lang = params.get("lang", "cs") |
| sample_rate = params.get("sample_rate", "16000") |
| model = params.get("model", "nova-3") |
| endpointing = params.get("endpointing", "300") |
| utterance_end_ms = params.get("utterance_end_ms", "") |
|
|
| lang_param = "&language=multi" if lang == "multi" else f"&language={lang}" |
| dg_url = ( |
| f"wss://api.deepgram.com/v1/listen" |
| f"?model={model}{lang_param}&encoding=linear16" |
| f"&sample_rate={sample_rate}&channels=1" |
| f"&interim_results=true&smart_format=true&punctuate=true" |
| f"&endpointing={endpointing}&vad_events=true" |
| ) |
| if utterance_end_ms: |
| dg_url += f"&utterance_end_ms={utterance_end_ms}" |
| for kw in params.getlist("keywords"): |
| dg_url += f"&keywords={kw}" |
|
|
| try: |
| async with _ws.connect( |
| dg_url, |
| additional_headers={"Authorization": f"Token {api_key}"}, |
| max_size=None, |
| ) as dg: |
| async def relay_dg(): |
| try: |
| async for msg in dg: |
| try: |
| await websocket.send_text(msg if isinstance(msg, str) else msg.decode()) |
| except Exception: |
| return |
| except Exception: |
| pass |
|
|
| dg_task = _asyncio.create_task(relay_dg()) |
| try: |
| while True: |
| msg = await websocket.receive() |
| if msg.get("type") == "websocket.disconnect": |
| break |
| if msg.get("bytes"): |
| await dg.send(msg["bytes"]) |
| elif msg.get("text"): |
| await dg.send(msg["text"]) |
| except (WebSocketDisconnect, Exception): |
| pass |
| finally: |
| dg_task.cancel() |
| try: |
| await dg.send(_json.dumps({"type": "CloseStream"})) |
| await _asyncio.sleep(0.3) |
| except Exception: |
| pass |
| except Exception: |
| try: |
| await websocket.close() |
| except Exception: |
| pass |
| finally: |
| _log_stt_usage(_stt_ip, lang, model, _rl_time.time() - _stt_start) |
|
|
|
|
| if __name__ == "__main__": |
| import uvicorn |
| uvicorn.run(app, host="127.0.0.1", port=8000) |
|
|