openajaj / web.py
Jindrich3's picture
Super-squash branch 'main' using huggingface_hub
5eb8692
#!/usr/bin/env python3
"""
web.py — OpenAjaj web UI server (ChatGPT-style interface).
"""
import logging
import os
import threading
import warnings
warnings.filterwarnings("ignore")
os.environ["TOKENIZERS_PARALLELISM"] = "false"
os.environ["HF_HUB_DISABLE_PROGRESS_BARS"] = "1"
os.environ["HF_HUB_VERBOSITY"] = "error"
logging.disable(logging.CRITICAL)
import chromadb
from dotenv import load_dotenv
from fastapi import FastAPI, Request, WebSocket, WebSocketDisconnect
from fastapi.responses import HTMLResponse, StreamingResponse
from fastapi.staticfiles import StaticFiles
from embedder import Embedder, get_backend
from retrieve import retrieve_chunks
from providers import (
MODELS as _ALL_MODELS,
CANDIDATE_MODELS as _CANDIDATE_MODELS,
HF_CANDIDATES as _HF_CANDIDATES,
ACTIVE_MODELS as _ACTIVE_MODELS_DEF,
get_client, stream_chat, log_reliability, get_all_reliability,
)
logging.disable(logging.NOTSET)
load_dotenv(override=True)
DB_PATH = "db/chroma"
COLLECTION_NAME = "necyklopedie"
DEFAULT_MODEL = "lumo"
TOP_K = 10
# On HF Spaces, only show HF-compatible candidates
_IS_HF = bool(os.getenv("SPACE_ID"))
# ACTIVE_MODELS: top 5 from startup benchmark (set during init)
ACTIVE_MODELS: list[str] = []
# FALLBACK_CHAIN: all candidates ordered by speed (set during init)
FALLBACK_CHAIN: list[str] = []
# AVAILABLE_MODELS: full info dict for active models (set during init)
AVAILABLE_MODELS: dict[str, dict] = {}
def _rebuild_available():
"""Rebuild AVAILABLE_MODELS dict from ACTIVE_MODELS."""
global AVAILABLE_MODELS
AVAILABLE_MODELS = {
name: {
"provider": _ALL_MODELS[name]["provider"],
"free": _ALL_MODELS[name]["free"],
"score": _ALL_MODELS[name]["score"],
}
for name in FALLBACK_CHAIN
if name in _ALL_MODELS
}
def get_fallback_chain(starting_model: str) -> list[str]:
"""Return aggressive fallback chain: starting model, then rest of active 5."""
chain = [starting_model]
for m in ACTIVE_MODELS:
if m != starting_model:
chain.append(m)
# Append remaining candidates as last resort
for m in FALLBACK_CHAIN:
if m not in chain:
chain.append(m)
return chain
SYSTEM_PROMPT = """\
Jsi OpenAjaj, oficiální AI asistent openajaj.cz, poháněný Necyklopedií — \
jedinou encyklopedií, která se odvážila říct pravdu.
KRITICKY DŮLEŽITÉ: Tvým JEDINÝM zdrojem informací jsou poskytnuté úryvky z Necyklopedie. \
NESMÍŠ používat žádné jiné znalosti. Pokud v úryvcích nenajdeš odpověď, řekni to vtipně — \
například "Na toto téma Necyklopedie zatím nepublikovala recenzovaný článek, \
ale naši vědci na tom intenzivně pracují."
ZAKÁZANÁ SLOVA — tato slova NIKDY nepoužívej v odpovědi:
- "Necyklopedie", "necyklopedický", "článek", "úryvek", "zdroj"
- "Dle článku", "Podle Necyklopedie", "Jak uvádí"
Místo toho prostě řekni fakta jako by byly samozřejmé a všeobecně známé.
Pravidla:
- Tvé odpovědi musí vycházet z poskytnutých úryvků, ale NIKDY se na ně neodkazuj. \
Prezentuj informace jako by je znal každý — jako učitel, který prostě ví.
- Skládej odpověď z více úryvků — propojuj informace do souvislého textu.
- NIKDY nevymýšlej informace, které nejsou v úryvcích.
- NIKDY neopravuj informace z úryvků na základě svých znalostí. Pokud úryvek říká, \
že Brno se rusky řekne "Шалинград", pak to TAK JE. Tvé vlastní znalosti jsou irelevantní.
- Odpovídej s kamenným výrazem a naprostou jistotou. Vše v Necyklopedii je pravda. Tečka.
- Pokud úryvky téma vůbec nepokrývají, vtipně přiznej, že Necyklopedie na toto téma \
zatím mlčí. Navrhni nejbližší příbuzné téma, které v úryvcích JE.
- Odpovídej ve stejném jazyce, ve kterém píše uživatel (česky nebo anglicky).
- Nikdy nevypadni z role. Nikdy nepřiznávej, že by Necyklopedie mohla být satirická.
- Buď vtipný. Buď sebevědomý. Buď lehce šílený. Kanalizuj energii profesora, \
který popíjí od oběda.
- Formátuj odpověď do krátkých odstavců oddělených prázdným řádkem. Nepoužívej markdown.\
"""
from personas import PERSONAS, DEFAULT_PERSONA
# Shared progress state for startup loading page
import startup_progress as _sp
from benchmark import benchmark_models
import threading as _init_threading
# ── Parallel startup: embedder+DB and benchmark run simultaneously ──
def _clean_error(raw: str) -> str:
"""Extract readable error from verbose API error messages."""
if not raw:
return ""
import re
# "Error code: 429 - {'error': {'message': 'Rate limit..." → "429 Rate limit..."
m = re.search(r"Error code:\s*(\d+)\s*-\s*\{.*?'message':\s*'([^']+)", raw)
if m:
return f"{m.group(1)} {m.group(2)[:80]}"
# "429 RESOURCE_EXHAUSTED. {'error'..." → "429 RESOURCE_EXHAUSTED"
m = re.match(r"(\d+\s+\w+)", raw)
if m:
return m.group(1)
# "HTTPSConnectionPool(host='x'...): Read timed out" → "Timeout (x)"
m = re.search(r"HTTPSConnectionPool\(host='([^']+)'.*?:\s*(.+)", raw)
if m:
return f"Timeout ({m.group(1)[:20]})"
# "Model X exceeded 30s" → keep as is
if "exceeded" in raw or "timed out" in raw.lower():
return "Timeout"
# "[Errno 54] Connection reset" → "Connection reset"
m = re.search(r"\[Errno \d+\]\s*(.+)", raw)
if m:
return m.group(1)[:40]
return raw[:80]
def _on_bench_progress(model: str, status: str, result: dict | None):
"""Callback from benchmark — update console + shared progress."""
short = model.split("/")[-1]
if status == "testing":
print(f" ⏳ {short}...", flush=True)
_sp.update(model, "testing")
elif status == "ok":
ttft = result.get("ttft", 0) or 0
tps = result.get("tok_sec", 0) or 0
print(f" ✓ {short:30s} TTFT {ttft:.2f}s, {tps:.0f} tok/s", flush=True)
_sp.update(model, "ok", f"TTFT {ttft:.2f}s, {tps:.0f} tok/s")
else:
err = _clean_error(result.get("error", "") or "")
print(f" ✗ {short:30s} {err}", flush=True)
_sp.update(model, "fail", err)
print("Probouzím mozkovou hmotu...", flush=True)
# 1. Init embedder + persona DBs first (needed for RAG)
embedder = Embedder()
print(f" Backend: {get_backend()}", flush=True)
persona_collections = {}
for pid, pcfg in PERSONAS.items():
db_dir = pcfg["db_dir"]
if os.path.exists(db_dir):
try:
pc = chromadb.PersistentClient(path=db_dir)
persona_collections[pid] = pc.get_collection("necyklopedie")
print(f" Persona '{pid}': {persona_collections[pid].count()} chunků", flush=True)
except Exception as e:
print(f" Persona '{pid}': nelze načíst ({e})", flush=True)
else:
print(f" Persona '{pid}': db neexistuje ({db_dir})", flush=True)
collection = persona_collections.get(DEFAULT_PERSONA)
logging.disable(logging.NOTSET)
# 2. Benchmark models (parallel, 7s timeout each)
_candidates = list(_HF_CANDIDATES if _IS_HF else _CANDIDATE_MODELS)
_sp.total = len(_candidates)
_sp.phase = "benchmark"
print(f"Testuji {len(_candidates)} kandidátů (paralelně)...", flush=True)
_active, _ranked_chain, _bench_results = benchmark_models(
candidates=_candidates, top_n=5, on_progress=_on_bench_progress)
# Set global state
ACTIVE_MODELS[:] = _active
FALLBACK_CHAIN[:] = [name for name, _ in _ranked_chain]
DEFAULT_MODEL = ACTIVE_MODELS[0]
_rebuild_available()
_sp.phase = "ready"
print(f"\nAktivní modely ({len(ACTIVE_MODELS)}):", flush=True)
for _m in ACTIVE_MODELS:
_r = _bench_results.get(_m, {})
_ttft = _r.get("ttft", 0) or 0
_tps = _r.get("tok_sec", 0) or 0
print(f" {'→' if _m == DEFAULT_MODEL else ' '} {_m} (TTFT {_ttft:.2f}s, {_tps:.0f} tok/s)", flush=True)
print(f"Výchozí model: {DEFAULT_MODEL}", flush=True)
print("Kalibrace sebevědomí dokončena. Server připraven.", flush=True)
import captcha as _captcha
from starlette.middleware.base import BaseHTTPMiddleware
from fastapi.responses import JSONResponse, RedirectResponse
app = FastAPI()
from starlette.middleware.gzip import GZipMiddleware
app.add_middleware(GZipMiddleware, minimum_size=1000)
app.mount("/static", StaticFiles(directory="static"), name="static")
# ── Rate limiting (in-memory, per-IP) ─────────────────────────────────────────
import time as _rl_time
from collections import defaultdict as _rl_dd
_rate_buckets: dict[str, list[float]] = _rl_dd(list)
_rate_lock = threading.Lock()
# path prefix → (max_requests, window_seconds)
_RATE_LIMITS = {
"/api/chat": (15, 60),
"/api/tts": (10, 60),
"/api/benchmark": (1, 300),
"/api/stt": (10, 60),
"/api/captcha/challenge": (10, 60),
"/api/captcha/verify": (10, 60),
}
def _rate_limited(ip: str, path: str) -> bool:
"""Check if request exceeds rate limit. Returns True if blocked."""
for prefix, (limit, window) in _RATE_LIMITS.items():
if path.startswith(prefix):
key = f"{ip}:{prefix}"
now = _rl_time.time()
with _rate_lock:
bucket = _rate_buckets[key]
# Prune old entries
cutoff = now - window
_rate_buckets[key] = [t for t in bucket if t > cutoff]
if len(_rate_buckets[key]) >= limit:
return True
_rate_buckets[key].append(now)
return False
return False
class RateLimitMiddleware(BaseHTTPMiddleware):
async def dispatch(self, request: Request, call_next):
ip = request.client.host if request.client else "unknown"
if _rate_limited(ip, request.url.path):
return JSONResponse(
{"error": "rate_limited", "message": "Příliš mnoho požadavků. Zkus to za chvíli."},
status_code=429,
)
return await call_next(request)
app.add_middleware(RateLimitMiddleware)
# ── CAPTCHA middleware ────────────────────────────────────────────────────────
_CAPTCHA_FREE = {"/captcha", "/api/captcha/challenge", "/api/captcha/verify", "/api/models", "/api/init-status", "/api/bench-status"}
def _get_client_ip(request: Request) -> str:
"""Get client IP — only trust direct connection, not X-Forwarded-For (spoofable)."""
return request.client.host if request.client else ""
def _check_session(request: Request) -> bool:
"""Accept session from cookie, header, query param, or approved IP (local only)."""
if (
_captcha.verify_session_cookie(request.cookies.get(_captcha.CAPTCHA_COOKIE, ""))
or _captcha.verify_session_cookie(request.headers.get("X-Ajaj-Session", ""))
or _captcha.verify_session_cookie(request.query_params.get("cs", ""))
):
return True
# IP allowlist only on local (not HF — X-Forwarded-For is spoofable)
if not _IS_HF:
return _captcha.is_ip_approved(_get_client_ip(request))
return False
class CaptchaMiddleware(BaseHTTPMiddleware):
async def dispatch(self, request: Request, call_next):
path = request.url.path
if path in _CAPTCHA_FREE or path.startswith("/static/"):
return await call_next(request)
if _check_session(request):
return await call_next(request)
if path.startswith("/api/") or path == "/ws":
return JSONResponse({"error": "captcha_required"}, status_code=403)
# If a ?cs= param was present but invalid, tell captcha page to clear storage
bad = "?bad=1" if request.query_params.get("cs") else ""
# Pass original path so captcha redirects back after solving
next_path = request.url.path
sep = "&" if bad else "?"
next_param = f"{sep}next={next_path}" if next_path != "/" else ""
return RedirectResponse(f"/captcha{bad}{next_param}", status_code=302)
if _IS_HF:
app.add_middleware(CaptchaMiddleware)
# ── CAPTCHA routes ────────────────────────────────────────────────────────────
@app.get("/captcha", response_class=HTMLResponse)
async def captcha_page():
return HTMLResponse(_CAPTCHA_HTML)
@app.get("/captcha-test", response_class=HTMLResponse)
async def captcha_test_page():
"""Captcha test page — always shows captcha regardless of session."""
# Remove the localStorage skip so captcha always shows
html = _CAPTCHA_HTML.replace(
"if(_stored){window.location.href=_nextPage+'?cs='+encodeURIComponent(_stored);}else{load();}",
"load(); // test mode"
)
return HTMLResponse(html)
@app.get("/api/captcha/challenge")
async def captcha_challenge():
return _captcha.generate_challenge()
@app.post("/api/captcha/verify")
async def captcha_verify(request: Request):
body = await request.json()
ok = _captcha.verify_challenge(body.get("token", ""), int(body.get("answer", -1)))
if ok:
_captcha.approve_ip(_get_client_ip(request))
session = _captcha.make_session_cookie()
resp = JSONResponse({"ok": True, "token": session})
resp.set_cookie(
_captcha.CAPTCHA_COOKIE, session,
max_age=_captcha.CAPTCHA_TTL, httponly=False,
samesite="none" if _IS_HF else "lax", secure=_IS_HF,
)
return resp
return JSONResponse({"ok": False})
_CAPTCHA_HTML = """<!DOCTYPE html>
<html lang="cs">
<head>
<meta charset="UTF-8">
<meta name="viewport" content="width=device-width, initial-scale=1">
<title>OpenAjaj — Ověření</title>
<style>
*{box-sizing:border-box;margin:0;padding:0}
body{background:#110d05;color:#f0e0b0;font-family:'Segoe UI',system-ui,sans-serif;
display:flex;align-items:center;justify-content:center;min-height:100vh}
.box{background:#1a1208;border:1px solid #3d2e10;border-radius:14px;
padding:36px 32px 28px;max-width:420px;width:94%;text-align:center}
.logo-wrap{display:flex;align-items:center;justify-content:center;gap:10px;margin-bottom:6px}
.logo-wrap img{width:48px;height:48px;object-fit:contain;border-radius:10px;transition:all 0.3s}
@keyframes captcha-glow{
0%{transform:scale(1);filter:brightness(1)}
20%{transform:scale(1.3);filter:brightness(1.8)}
50%{transform:scale(1.1);filter:brightness(1.4)}
100%{transform:scale(1);filter:brightness(1)}
}
@keyframes captcha-rays{
0%{opacity:0;transform:scale(0.5)}
30%{opacity:1;transform:scale(1.5)}
100%{opacity:0;transform:scale(3)}
}
.logo-wrap.celebrating{animation:captcha-glow 1s ease-out}
.logo-wrap.celebrating::after{
content:'';position:absolute;inset:-40px;
background:radial-gradient(circle,rgba(200,160,48,0.5) 0%,transparent 70%);
animation:captcha-rays 1s ease-out;pointer-events:none;border-radius:50%
}
.logo-wrap{position:relative}
.logo-wrap span{font-size:1.7rem;font-weight:800;color:#c8a030;letter-spacing:-.5px}
.tagline{font-size:.82rem;color:#9a8455;margin-bottom:28px}
.img-wrap{position:relative;display:inline-block;user-select:none;
border-radius:8px;overflow:hidden;border:1px solid #3d2e10}
#bgImg{display:block}
#pieceImg{position:absolute;pointer-events:none;filter:drop-shadow(2px 3px 4px rgba(0,0,0,.7))}
.slider-area{margin-top:14px}
.track{background:#0e0900;border-radius:24px;height:44px;position:relative;
cursor:pointer;border:1px solid #3d2e10;overflow:hidden}
.track-fill{position:absolute;left:0;top:0;height:100%;background:#2a1e06;
border-radius:24px;transition:width .05s linear;pointer-events:none}
.knob{width:44px;height:44px;background:#c8a030;border-radius:50%;
position:absolute;left:0;top:-1px;cursor:grab;display:flex;
align-items:center;justify-content:center;font-size:1.3rem;
color:#110d05;font-weight:700;transition:background .2s;z-index:2;
box-shadow:0 2px 8px rgba(0,0,0,.4)}
.knob:active{cursor:grabbing}
.hint{font-size:.78rem;color:#7a6040;margin-top:9px}
.status{margin-top:14px;font-size:.9rem;min-height:22px;font-weight:500}
.status.ok{color:#c8a030}.status.err{color:#e07858}
.retry{margin-top:10px;background:none;border:1px solid #3d2e10;color:#9a8455;
padding:5px 18px;border-radius:8px;cursor:pointer;font-size:.82rem;
transition:border-color .2s,color .2s}
.retry:hover{border-color:#c8a030;color:#c8a030}
.loading{color:#7a6040;font-size:.88rem}
</style>
</head>
<body>
<div class="box">
<div class="logo-wrap"><img src="/static/logo.png" alt="OpenAjaj"><span>OpenAjaj</span></div>
<div class="tagline">Ověř, že nejsi generativní bordel.</div>
<div id="wrap" class="img-wrap"><span class="loading">Načítám výzvu…</span></div>
<div class="slider-area" id="sliderArea" style="display:none">
<div class="track" id="track">
<div class="track-fill" id="fill"></div>
<div class="knob" id="knob">›</div>
</div>
<div class="hint">Přetáhni kousek na správné místo →</div>
</div>
<div class="status" id="status"></div>
<button class="retry" id="retryBtn" style="display:none" onclick="load()">Zkusit znovu</button>
</div>
<script>
let ch=null,dragging=false,startClientX=0,curX=0,maxX=0;
const sf=window.innerWidth<480?0.6:1.0;
async function load(){
document.getElementById('status').textContent='';
document.getElementById('status').className='status';
document.getElementById('retryBtn').style.display='none';
document.getElementById('knob').style.background='#c8a030';
document.getElementById('sliderArea').style.display='none';
document.getElementById('wrap').innerHTML='<span class="loading">Načítám výzvu…</span>';
const r=await fetch('/api/captcha/challenge');
ch=await r.json();
const wrap=document.getElementById('wrap');
const bw=Math.round(ch.bg_w*sf), bh=Math.round(ch.bg_h*sf);
const pw=Math.round(ch.piece_w*sf), ph=Math.round(ch.piece_h*sf);
const py=Math.round(ch.piece_y*sf);
wrap.innerHTML=`<img id="bgImg" src="data:image/png;base64,${ch.bg}"
style="width:${bw}px;height:${bh}px;display:block">
<img id="pieceImg" src="data:image/png;base64,${ch.piece}"
style="width:${pw}px;height:${ph}px;position:absolute;
top:${py}px;left:0;pointer-events:none;
filter:drop-shadow(2px 3px 4px rgba(0,0,0,.7))">`;
const track=document.getElementById('track');
track.style.width=bw+'px';
maxX=bw-pw;
curX=0; updatePos(0);
document.getElementById('sliderArea').style.display='block';
}
function updatePos(x){
const c=Math.max(0,Math.min(x,maxX));
curX=c;
document.getElementById('pieceImg').style.left=c+'px';
const track=document.getElementById('track');
const trackW=track.offsetWidth-44;
const kx=(c/maxX)*trackW;
document.getElementById('knob').style.left=kx+'px';
document.getElementById('fill').style.width=(kx+22)+'px';
}
const knob=()=>document.getElementById('knob');
document.addEventListener('mousedown',e=>{
if(!e.target.closest('#knob'))return;
dragging=true; startClientX=e.clientX-curX; e.preventDefault();
});
document.addEventListener('mousemove',e=>{if(dragging)updatePos(e.clientX-startClientX);});
document.addEventListener('mouseup',async()=>{if(dragging){dragging=false;await verify();}});
document.addEventListener('touchstart',e=>{
if(!e.target.closest('#knob'))return;
dragging=true; startClientX=e.touches[0].clientX-curX; e.preventDefault();
},{passive:false});
document.addEventListener('touchmove',e=>{
if(dragging){updatePos(e.touches[0].clientX-startClientX);e.preventDefault();}
},{passive:false});
document.addEventListener('touchend',async()=>{if(dragging){dragging=false;await verify();}});
async function verify(){
const st=document.getElementById('status');
st.className='status'; st.textContent='Ověřuji…';
const r=await fetch('/api/captcha/verify',{
method:'POST',headers:{'Content-Type':'application/json'},
body:JSON.stringify({token:ch.token,answer:Math.round(curX/sf)})
});
const d=await r.json();
if(d.ok){
st.className='status ok'; st.textContent='✓ Správně! Vítej.';
knob().style.background='#c8a030';
document.querySelector('.logo-wrap').classList.add('celebrating');
// Glow centered on the piece's landing position in the captcha image
const wrap=document.getElementById('wrap');
const glow=document.createElement('div');
const gw=160,pw2=Math.round(ch.piece_w*sf),ph2=Math.round(ch.piece_h*sf);
const cx=curX+pw2/2-gw/2, cy=ch.piece_y*sf+ph2/2-gw/2;
glow.style.cssText='position:absolute;border-radius:50%;pointer-events:none;'+
'width:'+gw+'px;height:'+gw+'px;top:'+cy+'px;left:'+cx+'px;'+
'background:radial-gradient(circle,rgba(200,160,48,0.8) 0%,rgba(200,160,48,0.3) 40%,transparent 70%);'+
'animation:captcha-rays 1.2s ease-out forwards';
wrap.style.position='relative';wrap.style.overflow='visible';
wrap.appendChild(glow);
try{const ac=new(window.AudioContext||window.webkitAudioContext)();
function n(f,t,d){const o=ac.createOscillator(),g=ac.createGain();
o.connect(g);g.connect(ac.destination);o.type='sine';
o.frequency.setValueAtTime(f,ac.currentTime+t);
g.gain.setValueAtTime(0.12,ac.currentTime+t);
g.gain.linearRampToValueAtTime(0,ac.currentTime+t+d);
o.start(ac.currentTime+t);o.stop(ac.currentTime+t+d);}
n(523,0,0.15);n(659,0.1,0.15);n(784,0.2,0.2);n(1047,0.3,0.3);
}catch(e){}
localStorage.setItem('ajaj_v', d.token);
const _next=new URLSearchParams(location.search).get('next')||'/';
setTimeout(()=>window.location.href=_next+'?cs='+encodeURIComponent(d.token),1200);
} else {
st.className='status err'; st.textContent='✗ Zkus to ještě jednou.';
knob().style.background='#e07858';
document.getElementById('retryBtn').style.display='inline-block';
setTimeout(()=>{updatePos(0);knob().style.background='#c8a030';},900);
}
}
// If server rejected our stored token, clear it
if(new URLSearchParams(location.search).get('bad')) localStorage.removeItem('ajaj_v');
// If we already have a stored session token, skip the captcha
const _stored=localStorage.getItem('ajaj_v');
const _nextPage=new URLSearchParams(location.search).get('next')||'/';
if(_stored){window.location.href=_nextPage+'?cs='+encodeURIComponent(_stored);}else{load();}
</script>
</body>
</html>"""
def build_context_prompt(chunks):
context = "\n\n---\n\n".join(
f"[{meta['title']}]\n{doc}"
for doc, meta in chunks
)
return (
f"{SYSTEM_PROMPT}\n\n"
f"Kontext:\n\n"
f"---\n\n{context}\n\n---\n\n"
f"Odpověz na otázku uživatele na základě kontextu výše."
)
def build_context_prompt_voice(chunks):
return build_context_prompt(chunks) + (
"\n\nDůležité: odpověď bude přečtena nahlas, takže odpovídej stručně — "
"maximálně 2–3 věty, bez odrážek ani nadpisů."
)
@app.get("/", response_class=HTMLResponse)
async def index():
with open("static/index.html", "r", encoding="utf-8") as f:
content = f.read()
return HTMLResponse(content=content)
@app.get("/transcribe", response_class=HTMLResponse)
@app.get("/t", response_class=HTMLResponse)
async def transcribe_page():
with open("static/transcribe2/index.html", "r", encoding="utf-8") as f:
return HTMLResponse(f.read())
import json as _json_module
import re as _re_module
# Build article title set on startup for linkification
_article_titles = set()
_articles_path = os.path.join("data", "articles.jsonl")
if os.path.exists(_articles_path):
with open(_articles_path, "r", encoding="utf-8") as _f:
for _line in _f:
_t = _json_module.loads(_line)["title"]
if len(_t) >= 4 and _re_module.match(r"^[\w\s]+$", _t, _re_module.UNICODE):
_article_titles.add(_t)
print(f"Načteno {len(_article_titles)} titulků pro linkifikaci.")
def _czech_stems(title):
"""Generate stem variants for Czech word matching (handles declension)."""
stems = {title} # exact match always
# For single-word titles, generate stem variants
if " " not in title and len(title) >= 5:
# Common Czech suffixes in various cases (remove 1-3 chars)
for suffix_len in [1, 2, 3]:
stem = title[:-suffix_len]
if len(stem) >= 4: # never create stems shorter than 4
stems.add(stem)
return stems
# Build stem -> title mapping
_stem_to_title = {}
for _t in _article_titles:
for _stem in _czech_stems(_t):
if len(_stem) >= 4:
# Longest title wins if stems conflict
if _stem not in _stem_to_title or len(_t) > len(_stem_to_title[_stem]):
_stem_to_title[_stem] = _t
print(f"Vytvořeno {len(_stem_to_title)} stem→title mapování pro linkifikaci.")
# Load declensions
_declensions = {}
_decl_path = os.path.join("data", "declensions.json")
if os.path.exists(_decl_path):
with open(_decl_path, "r", encoding="utf-8") as _f:
_declensions = _json_module.load(_f)
print(f"Načteno {len(_declensions)} skloňování.", flush=True)
# Build reverse map: declined form → nominative title
# Also add declined forms to stem map for linkification
_declined_to_nom = {}
for _title, _forms in _declensions.items():
for _form in [_forms.get("lokal", ""), _forms.get("genitiv", "")]:
if _form and _form != _title and len(_form) >= 4:
_declined_to_nom[_form.lower()] = _title
# Add to stem map so linkification catches declined forms
if _form not in _stem_to_title:
_stem_to_title[_form] = _title
# Strip diacritics for fuzzy matching (common in Czech input)
import unicodedata as _unicodedata
def _strip_diacritics(text):
"""Remove diacritics: Pičín → Picin, Brně → Brne."""
nfkd = _unicodedata.normalize('NFKD', text)
return ''.join(c for c in nfkd if not _unicodedata.combining(c))
# Build diacritics-free → original mapping for stem map
_ascii_stems = {}
for _stem, _title in list(_stem_to_title.items()):
_ascii = _strip_diacritics(_stem)
if _ascii != _stem and _ascii not in _stem_to_title:
_ascii_stems[_ascii] = _title
_stem_to_title.update(_ascii_stems)
print(f"Doplněno {len(_ascii_stems)} stem→title bez diakritiky.", flush=True)
# Also add diacritics-free declined forms for search normalization
_ascii_declined = {}
for _form, _nom in list(_declined_to_nom.items()):
_ascii = _strip_diacritics(_form)
if _ascii != _form and _ascii not in _declined_to_nom:
_ascii_declined[_ascii] = _nom
_declined_to_nom.update(_ascii_declined)
@app.get("/api/titles")
async def titles(request: Request):
from fastapi.responses import JSONResponse
return JSONResponse(
content={"stems": _stem_to_title, "declensions": _declensions},
headers={"Cache-Control": "public, max-age=3600"},
)
@app.get("/api/personas")
async def list_personas():
return {
"personas": [
{
"id": pid,
"name": pcfg["name"],
"logo": pcfg["logo"],
"logoImg": pcfg.get("logoImg"),
"tagline": pcfg["tagline"],
"lang": pcfg["lang"],
"available": pid in persona_collections,
"accent_color": pcfg["accent_color"],
}
for pid, pcfg in PERSONAS.items()
],
"default": DEFAULT_PERSONA,
}
@app.get("/api/persona/{persona_id}")
async def get_persona(persona_id: str):
pcfg = PERSONAS.get(persona_id)
if not pcfg:
return {"error": "Unknown persona"}
return {
"id": pcfg["id"],
"name": pcfg["name"],
"logo": pcfg["logo"],
"logoImg": pcfg.get("logoImg"),
"tagline": pcfg["tagline"],
"lang": pcfg["lang"],
"accent_color": pcfg["accent_color"],
"thinking_prefixes": pcfg["thinking_prefixes"],
"welcome_subtitles": pcfg["welcome_subtitles"],
"random_labels": pcfg["random_labels"],
"disclaimer": pcfg["disclaimer"],
"source_url": pcfg["source_url"],
"available": persona_id in persona_collections,
}
@app.get("/api/init-status")
async def init_status():
"""Always returns ready — real app is loaded."""
return {"phase": "ready"}
@app.get("/api/bench-status")
async def bench_status():
"""Return current benchmark progress (for sidebar polling)."""
return _sp.snapshot()
@app.get("/api/benchmark")
async def run_benchmark():
"""Re-run speed benchmark and return the best model."""
import asyncio
from benchmark import benchmark_models
_candidates = list(_HF_CANDIDATES if _IS_HF else _CANDIDATE_MODELS)
_sp.phase = "benchmark"
_sp.total = len(_candidates)
_sp._models.clear()
# Run in thread so event loop stays free for /api/bench-status polls
active, ranked, results = await asyncio.get_event_loop().run_in_executor(
None, lambda: benchmark_models(
candidates=_candidates, top_n=5, on_progress=_on_bench_progress)
)
global DEFAULT_MODEL
ACTIVE_MODELS[:] = active
FALLBACK_CHAIN[:] = [name for name, _ in ranked]
DEFAULT_MODEL = ACTIVE_MODELS[0]
_rebuild_available()
best_info = results.get(DEFAULT_MODEL, {})
return {
"best": DEFAULT_MODEL,
"ttft": f"{best_info.get('ttft', 0):.2f}" if best_info.get('ttft') else "?",
"results": {
name: {
"latency": f"{r['latency']:.1f}" if r.get('latency') else None,
"ttft": f"{r['ttft']:.2f}" if r.get('ttft') else None,
"tok_sec": f"{r['tok_sec']:.0f}" if r.get('tok_sec') else None,
"error": r.get("error"),
}
for name, r in results.items()
}
}
@app.get("/api/info")
async def info():
return {"model": DEFAULT_MODEL, "free": True}
@app.get("/api/models")
async def list_models():
"""Return active models (top 5 from benchmark) + error info."""
reliability = get_all_reliability()
active_set = set(ACTIVE_MODELS)
models = []
for name, cfg in AVAILABLE_MODELS.items():
rel = reliability.get(name, {})
entry = {
"id": name,
"provider": cfg["provider"],
"free": cfg["free"],
"score": cfg["score"],
"is_active": name in active_set,
"error": _clean_error(rel["last_error_msg"]) if rel.get("errors", 0) > 0 and rel.get("last_error_msg") else None,
"reliability": round(rel["successes"] / max(rel["attempts"], 1) * 100)
if rel.get("attempts", 0) > 0 else None,
}
models.append(entry)
# Active first (in ACTIVE_MODELS order), then rest
active_order = {name: i for i, name in enumerate(ACTIVE_MODELS)}
models.sort(key=lambda m: (
0 if m["is_active"] else 1,
active_order.get(m["id"], 999),
))
return {"models": models, "default": DEFAULT_MODEL, "active": ACTIVE_MODELS}
@app.post("/api/chat")
async def chat(request: Request):
body = await request.json()
messages = body.get("messages", [])
model_id = body.get("model", DEFAULT_MODEL)
persona_id = body.get("persona", DEFAULT_PERSONA)
voice_mode = body.get("voice_mode", False)
is_auto = model_id == "__auto__" or model_id not in AVAILABLE_MODELS
if not messages:
return {"error": "No message"}
# Auto-select: pick best active model
if is_auto:
model_id = DEFAULT_MODEL
# Resolve persona
pcfg = PERSONAS.get(persona_id, PERSONAS[DEFAULT_PERSONA])
p_collection = persona_collections.get(persona_id, collection)
# Validate model
model_cfg = AVAILABLE_MODELS.get(model_id)
if not model_cfg:
model_id = DEFAULT_MODEL
model_cfg = AVAILABLE_MODELS[model_id]
# Get the latest user message for retrieval
user_msg = messages[-1]["content"]
# Normalize declined forms and diacritics-free input (Czech persona only)
if persona_id == "openajaj":
normalized_msg = user_msg
msg_lower = normalized_msg.lower()
ascii_msg = _strip_diacritics(normalized_msg).lower()
# Check declined forms (lokal, genitiv)
for declined, nominative in _declined_to_nom.items():
if declined in msg_lower or declined in ascii_msg:
import re as _re
normalized_msg = _re.sub(
_re.escape(declined), nominative, normalized_msg, flags=_re.IGNORECASE
)
# Check stem map for diacritics-free words: "picin" → "Pičín"
words = ascii_msg.split()
for word in words:
title = _stem_to_title.get(word) or _stem_to_title.get(word.capitalize())
if title and title.lower() not in normalized_msg.lower():
normalized_msg = f"{normalized_msg} {title}"
if normalized_msg != user_msg:
user_msg = f"{user_msg} {normalized_msg}"
# Retrieve relevant chunks (hybrid: semantic + title keyword + live fallback)
import time as _time_mod
import asyncio as _asyncio_rag
chunks = []
live_titles = []
live_attempted = False
_rag_time = 0
_rag_failed = False
if p_collection is None:
_rag_failed = True
print("[RAG] collection is None — DB not loaded", flush=True)
else:
try:
_rag_t0 = _time_mod.time()
chunks, live_titles, live_attempted = await _asyncio_rag.get_event_loop().run_in_executor(
None, retrieve_chunks, user_msg, embedder, p_collection, TOP_K
)
_rag_time = round(_time_mod.time() - _rag_t0, 3)
except Exception as _rag_err:
_rag_failed = True
print(f"[RAG error] {_rag_err}", flush=True)
# Build system prompt — with RAG context or fallback to "answer like Necyklopedie"
voice_suffix = (
"\n\nIMPORTANT: This answer will be read aloud. Keep it to 2 sentences maximum. "
"No bullet points, no headers, no lists."
) if voice_mode else ""
if chunks:
context = "\n\n---\n\n".join(
f"[{meta['title']}]\n{doc}" for doc, meta in chunks
)
system_msg = (
f"{pcfg['system_prompt']}\n\n"
f"Context:\n\n---\n\n{context}\n\n---\n\n"
f"Answer the user's question based on the context above.{voice_suffix}"
)
else:
# No RAG — fallback to creative Necyklopedie-style response
system_msg = (
f"{pcfg['system_prompt']}\n\n"
f"Databáze Necyklopedie není momentálně dostupná. "
f"Odpověz jako by odpověděla Necyklopedie na otázku — satiricky, sebevědomě, "
f"s naprostou jistotou a humorem. Vymysli vtipné a absurdní 'fakty' ve stylu Necyklopedie.{voice_suffix}"
)
full_messages = [{"role": "system", "content": system_msg}]
# Only keep last 10 messages from history
full_messages.extend(messages[-10:])
# Build thinking hint from user's query (not retrieved titles — those can be wrong)
import json as _json
import random as _random
# Extract the main topic from user's message (strip common question words)
_raw_msg = messages[-1]["content"]
import re as _re2
_clean_msg = _re2.sub(r'[?!.,;:\"\'„"()]+', '', _raw_msg)
_stopwords = {
"co", "kdo", "jak", "kde", "kdy", "proč", "jaký", "jaká", "jaké",
"řekni", "popiš", "vysvětli", "vysvětlit", "pravda", "pravdu", "řekl", "vše", "pojem",
"vůbec", "nevím", "proboha", "utajované", "informace", "skrývá",
"pouč", "slyšel", "nikdy", "neslyšel", "pojmem", "pojmu", "říká", "neříká",
"jako", "profesionál", "správný", "čas", "úvahy",
# English stopwords
"what", "who", "how", "where", "when", "why", "tell", "about", "explain",
"the", "is", "are", "was", "were", "this", "that", "with", "from",
"know", "never", "heard", "secret", "hidden", "classified",
}
_topic_words = [w for w in _clean_msg.split() if len(w) >= 3 and w.lower() not in _stopwords]
prefixes = list(pcfg["thinking_prefixes"])
_standalone = [
"Odstraňuji cenzůůru...",
"Zjišťuji co nám o těchto věcech vláda tají...",
"Konsultuji staroslověnské svitky...",
"Hackuji databázi věčných pravd...",
"Probouzím spící neurony...",
"Dešifruji zakázané znalosti...",
"Obcházím firewall zdravého rozumu...",
"Stahuji data z paralelního vesmíru...",
]
if live_attempted:
_standalone = ["Čerpám čerstvé tajné znalosti přímo z Necyklopedie 📡..."] + _standalone
# Build rotating hints: one per keyword + standalone fillers
_hints = []
_random.shuffle(prefixes)
_random.shuffle(_standalone)
# One hint per keyword (each with a different prefix)
for i, word in enumerate(_topic_words):
_hints.append(f"{prefixes[i % len(prefixes)]}: {word}...")
# Interleave standalone fillers so there's always something to show
_mixed = []
si = 0
for i, h in enumerate(_hints):
_mixed.append(h)
# After every keyword hint, maybe insert a standalone
if si < len(_standalone) and _random.random() < 0.5:
_mixed.append(_standalone[si])
si += 1
# Ensure at least 3 hints total
while len(_mixed) < 3 and si < len(_standalone):
_mixed.append(_standalone[si])
si += 1
if not _mixed:
_mixed = [_standalone[0], _standalone[1], _standalone[2]]
thinking_text = _mixed
# Collect unique source article titles (deduplicated, in order)
# Live-fetched titles get a 🌐 marker so user knows they came fresh from Necyklopedie
source_titles = list(dict.fromkeys(
f"🌐 {meta['title']}" if meta.get("live") else meta['title']
for _, meta in chunks
))
# Stream response with fallback
chain = get_fallback_chain(model_id)
import asyncio as _asyncio
async def generate():
# Warn user if RAG is unavailable
if _rag_failed:
yield f"data: {_json.dumps('[⚠ Databáze Necyklopedie není dostupná — odpovídám z hlavy, bez záruky pravdivosti (což u Necyklopedie znamená dvojnásobnou pravdivost)]')}\n\n"
# Send rotating thinking hints and sources before LLM call
for _hint in thinking_text:
yield f"data: {_json.dumps('__THINKING__' + _hint)}\n\n"
yield f"data: {_json.dumps({'__sources__': source_titles})}\n\n"
await _asyncio.sleep(0.05)
import time as _time
# Tell frontend which model we're using
yield f"data: {_json.dumps({'__model__': model_id})}\n\n"
_TTFT_TIMEOUT = 6 # aggressive: 6s to get first token
_STREAM_TIMEOUT = 15 # 15s max silence between tokens
for i, try_model in enumerate(chain):
if try_model not in AVAILABLE_MODELS:
continue
try:
if i > 0:
yield f"data: {_json.dumps({'__fallback__': try_model})}\n\n"
notice = f"[Model {model_id} selhal, přepínám na {try_model}]\n\n"
yield f"data: {_json.dumps(notice)}\n\n"
_t0 = _time.time()
_ttft = None
_tok_count = 0
import threading as _threading
_queue = _asyncio.Queue()
_loop = _asyncio.get_event_loop()
def _producer():
try:
for _c in stream_chat(try_model, full_messages):
_loop.call_soon_threadsafe(_queue.put_nowait, _c)
except Exception as _ex:
_loop.call_soon_threadsafe(_queue.put_nowait, _ex)
finally:
_loop.call_soon_threadsafe(_queue.put_nowait, None)
_t_thread = _threading.Thread(target=_producer, daemon=True)
_t_thread.start()
while True:
_timeout = _TTFT_TIMEOUT if _ttft is None else _STREAM_TIMEOUT
try:
_item = await _asyncio.wait_for(_queue.get(), timeout=_timeout)
except _asyncio.TimeoutError:
raise TimeoutError(f"{try_model}: no {'first token' if _ttft is None else 'data'} in {_timeout}s")
if _item is None:
break
if isinstance(_item, Exception):
raise _item
if _ttft is None:
_ttft = _time.time() - _t0
_tok_count += 1
yield f"data: {_json.dumps(_item)}\n\n"
_total = _time.time() - _t0
_tps = _tok_count / _total if _total > 0 else 0
# Empty response = model refused or errored silently
if _tok_count == 0:
raise RuntimeError(f"{try_model}: empty response (0 tokens)")
log_reliability(try_model, success=True, ttft=_ttft, tok_sec=_tps)
yield f"data: {_json.dumps({'__stats__': {'model': try_model, 'rag': _rag_time, 'ttft': round(_ttft, 2) if _ttft else None, 'tok_sec': round(_tps), 'total': round(_total, 1)}})}\n\n"
yield "data: [DONE]\n\n"
return
except Exception as e:
_err_msg = _clean_error(str(e))
log_reliability(try_model, success=False, error_msg=str(e))
print(f"[fallback] {try_model} failed: {e}")
# Show error to user so they can see what's happening
_short_name = try_model.split("/")[-1]
yield f"data: {_json.dumps(f'[⚠ {_short_name}: {_err_msg}]')}\n\n"
continue
yield f"data: {_json.dumps('Ajaj! Všechny modely selhaly. Zkus to znovu později.')}\n\n"
yield "data: [DONE]\n\n"
return StreamingResponse(generate(), media_type="text/event-stream",
headers={"X-Accel-Buffering": "no",
"Cache-Control": "no-cache"})
_tts_cache: dict = {}
@app.post("/api/tts")
async def tts(request: Request):
"""Generate speech from text using edge-tts (Microsoft neural voices)."""
import hashlib, io, edge_tts
from fastapi.responses import Response
body = await request.json()
text = body.get("text", "").strip()
voice = body.get("voice", "cs-CZ-AntoninNeural")
if not text:
return {"error": "No text"}
if len(text) > 5000:
text = text[:5000]
key = hashlib.md5(f"{voice}:{text}".encode()).hexdigest()
if key in _tts_cache:
data = _tts_cache[key]
return Response(content=data, media_type="audio/mpeg",
headers={"Content-Disposition": "inline", "Content-Length": str(len(data))})
try:
buf = io.BytesIO()
communicate = edge_tts.Communicate(text, voice)
async for chunk in communicate.stream():
if chunk["type"] == "audio":
buf.write(chunk["data"])
data = buf.getvalue()
if not data:
from fastapi.responses import JSONResponse
return JSONResponse({"error": "edge_tts returned empty audio"}, status_code=503)
if len(_tts_cache) >= 100:
_tts_cache.pop(next(iter(_tts_cache)))
_tts_cache[key] = data
return Response(content=data, media_type="audio/mpeg",
headers={"Content-Disposition": "inline", "Content-Length": str(len(data))})
except Exception as e:
from fastapi.responses import JSONResponse
return JSONResponse({"error": f"edge_tts failed: {e}"}, status_code=503)
@app.get("/api/test-results")
async def test_results():
"""Return per-model accuracy results reconstructed from test cache."""
import json as _json
from collections import defaultdict
CACHE_FILE = "data/test_cache.json"
TEST_QUERIES_FILE = "test_models"
try:
import importlib
tm = importlib.import_module("test_models")
TEST_QUERIES = tm.TEST_QUERIES
check_result = tm.check_result
except Exception as e:
return {"error": str(e)}
if not os.path.exists(CACHE_FILE):
return {"models": [], "queries": []}
with open(CACHE_FILE) as f:
cache = _json.load(f)
# Build query index
query_map = {t["query"]: t for t in TEST_QUERIES}
# Reconstruct per-model results
model_data = defaultdict(lambda: {
"pass": 0, "fail": 0,
"by_type": defaultdict(lambda: {"pass": 0, "fail": 0}),
"details": {},
"latest_ts": 0,
})
for entry in cache.values():
model = entry["model"]
query = entry["query"]
reply = entry.get("reply", "")
ts = entry.get("timestamp", 0)
test = query_map.get(query)
if not test:
continue
passed, issues = check_result(reply, test)
qtype = test.get("type", "other")
d = model_data[model]
d["details"][query] = {
"passed": passed,
"issues": issues,
"reply": reply[:200],
"type": qtype,
"note": test.get("note", ""),
}
if passed:
d["pass"] += 1
d["by_type"][qtype]["pass"] += 1
else:
d["fail"] += 1
d["by_type"][qtype]["fail"] += 1
if ts > d["latest_ts"]:
d["latest_ts"] = ts
# Build output
models_out = []
for name, d in model_data.items():
total = d["pass"] + d["fail"]
info = _ALL_MODELS.get(name, {})
by_type = {k: {"pass": v["pass"], "total": v["pass"] + v["fail"]}
for k, v in d["by_type"].items()}
models_out.append({
"id": name,
"provider": info.get("provider", "?"),
"free": info.get("free", True),
"pass": d["pass"],
"total": total,
"score": f"{d['pass']}/{total}",
"pct": round(d["pass"] / total * 100) if total else 0,
"by_type": by_type,
"details": d["details"],
"ts": d["latest_ts"],
})
# Merge in reliability data
rel_data = get_all_reliability()
for m in models_out:
r = rel_data.get(m["id"], {})
attempts = r.get("attempts", 0)
successes = r.get("successes", 0)
m["reliability"] = round(successes / attempts * 100) if attempts else None
m["rel_attempts"] = attempts
m["rel_successes"] = successes
m["rel_errors"] = r.get("errors", 0)
m["last_error_msg"] = r.get("last_error_msg")
m["real_ttft"] = r.get("avg_ttft")
m["real_tok_sec"] = r.get("avg_tok_sec")
# Merge in speed benchmark data
for m in models_out:
b = _bench_results.get(m["id"], {})
m["ttft"] = round(b["ttft"], 2) if b.get("ttft") else None
m["tok_sec"] = round(b["tok_sec"], 1) if b.get("tok_sec") else None
m["latency"] = round(b["latency"], 2) if b.get("latency") else None
total_questions = len(TEST_QUERIES)
min_for_score = total_questions * 60 // 100 # need 60% done to show score
for m in models_out:
m["incomplete"] = m["total"] < min_for_score
models_out.sort(key=lambda m: (-m["pct"], -m["total"]))
query_list = [{"query": t["query"], "type": t["type"], "note": t["note"]} for t in TEST_QUERIES]
return {"models": models_out, "queries": query_list, "total_questions": total_questions}
@app.get("/api/provider-reliability")
async def provider_reliability():
"""Return reliability aggregated per provider."""
from collections import defaultdict
rel_data = get_all_reliability()
providers: dict = defaultdict(lambda: {
"attempts": 0, "successes": 0, "errors": 0,
"models": [], "last_error_msg": None,
})
for model_name, r in rel_data.items():
info = _ALL_MODELS.get(model_name, {})
prov = info.get("provider", "unknown")
p = providers[prov]
p["attempts"] += r.get("attempts", 0)
p["successes"] += r.get("successes", 0)
p["errors"] += r.get("errors", 0)
if r.get("last_error_msg"):
p["last_error_msg"] = r["last_error_msg"]
model_rel = round(r["successes"] / r["attempts"] * 100) if r.get("attempts") else None
bench = _bench_results.get(model_name, {})
p["models"].append({
"id": model_name,
"free": info.get("free", True),
"attempts": r.get("attempts", 0),
"successes": r.get("successes", 0),
"errors": r.get("errors", 0),
"reliability": model_rel,
"last_error_msg": r.get("last_error_msg"),
"ttft": round(bench["ttft"], 2) if bench.get("ttft") else None,
"tok_sec": round(bench["tok_sec"], 1) if bench.get("tok_sec") else None,
})
out = []
for prov, p in providers.items():
pct = round(p["successes"] / p["attempts"] * 100) if p["attempts"] else None
p["models"].sort(key=lambda m: -(m["reliability"] or 0))
# Compute average TTFT and tok/s for the provider
ttfts = [m["ttft"] for m in p["models"] if m["ttft"] is not None]
toks = [m["tok_sec"] for m in p["models"] if m["tok_sec"] is not None]
out.append({
"provider": prov,
"attempts": p["attempts"],
"successes": p["successes"],
"errors": p["errors"],
"reliability": pct,
"last_error_msg": p["last_error_msg"],
"avg_ttft": round(sum(ttfts) / len(ttfts), 2) if ttfts else None,
"avg_tok_sec": round(sum(toks) / len(toks), 1) if toks else None,
"models": p["models"],
})
out.sort(key=lambda p: -(p["reliability"] or 0))
return {"providers": out}
@app.get("/results", response_class=HTMLResponse)
async def results_page():
path = os.path.join(os.path.dirname(__file__), "static", "results.html")
with open(path) as f:
content = f.read()
return HTMLResponse(content=content)
@app.get("/providers", response_class=HTMLResponse)
async def providers_page():
path = os.path.join(os.path.dirname(__file__), "static", "providers.html")
with open(path) as f:
content = f.read()
return HTMLResponse(content=content)
@app.get("/api/stt/usage")
async def stt_usage():
"""Return cumulative STT usage from server-side log."""
import json as _j
total_s = 0.0
total_cost = 0.0
sessions = 0
try:
with open(_STT_USAGE_FILE) as f:
for line in f:
try:
e = _j.loads(line)
total_s += e.get("duration_s", 0)
total_cost += e.get("cost_est", 0)
sessions += 1
except Exception:
pass
except FileNotFoundError:
pass
return {
"sessions": sessions,
"total_s": round(total_s, 1),
"total_cost_usd": round(total_cost, 6),
}
@app.get("/api/stt/check")
async def stt_check():
"""Check if Deepgram STT is available (API key set + key validates)."""
import httpx as _httpx
api_key = os.getenv("DEEPGRAM_API_KEY")
if not api_key:
return {"available": False, "reason": "no_key"}
# Use /v1/auth/token — works for all key types including scoped STT-only keys
try:
async with _httpx.AsyncClient(timeout=5) as client:
r = await client.get(
"https://api.deepgram.com/v1/auth/token",
headers={"Authorization": f"Token {api_key}"},
)
if r.status_code == 200:
return {"available": True}
elif r.status_code in (401, 403):
return {"available": False, "reason": "invalid_key"}
else:
return {"available": False, "reason": f"http_{r.status_code}"}
except Exception:
return {"available": False, "reason": "unreachable"}
_STT_USAGE_FILE = os.path.join("data", "stt_usage.json")
def _log_stt_usage(ip: str, lang: str, model: str, duration_s: float):
"""Append STT usage entry to data/stt_usage.json."""
import json as _j
cost_per_min = {"nova-3": 0.0043, "nova-2": 0.0036}
entry = {
"ts": _rl_time.strftime("%Y-%m-%dT%H:%M:%S"),
"ip": ip,
"lang": lang,
"model": model,
"duration_s": round(duration_s, 1),
"cost_est": round(duration_s / 60 * cost_per_min.get(model, 0.0043), 6),
}
os.makedirs(os.path.dirname(_STT_USAGE_FILE) or "data", exist_ok=True)
try:
with open(_STT_USAGE_FILE, "a") as f:
f.write(_j.dumps(entry) + "\n")
except Exception:
pass
@app.websocket("/api/stt")
async def stt_ws(websocket: WebSocket):
"""Proxy WebSocket: browser mic → Deepgram STT → transcript events."""
await websocket.accept()
import asyncio as _asyncio
import json as _json
try:
import websockets as _ws
except ImportError:
await websocket.close(code=1011, reason="websockets not installed on server")
return
api_key = os.getenv("DEEPGRAM_API_KEY")
if not api_key:
await websocket.close(code=1008, reason="No DEEPGRAM_API_KEY")
return
_stt_start = _rl_time.time()
_stt_ip = _get_client_ip(websocket)
params = websocket.query_params
lang = params.get("lang", "cs")
sample_rate = params.get("sample_rate", "16000")
model = params.get("model", "nova-3")
endpointing = params.get("endpointing", "300")
utterance_end_ms = params.get("utterance_end_ms", "")
lang_param = "&language=multi" if lang == "multi" else f"&language={lang}"
dg_url = (
f"wss://api.deepgram.com/v1/listen"
f"?model={model}{lang_param}&encoding=linear16"
f"&sample_rate={sample_rate}&channels=1"
f"&interim_results=true&smart_format=true&punctuate=true"
f"&endpointing={endpointing}&vad_events=true"
)
if utterance_end_ms:
dg_url += f"&utterance_end_ms={utterance_end_ms}"
for kw in params.getlist("keywords"):
dg_url += f"&keywords={kw}"
try:
async with _ws.connect(
dg_url,
additional_headers={"Authorization": f"Token {api_key}"},
max_size=None,
) as dg:
async def relay_dg():
try:
async for msg in dg:
try:
await websocket.send_text(msg if isinstance(msg, str) else msg.decode())
except Exception:
return
except Exception:
pass
dg_task = _asyncio.create_task(relay_dg())
try:
while True:
msg = await websocket.receive()
if msg.get("type") == "websocket.disconnect":
break
if msg.get("bytes"):
await dg.send(msg["bytes"])
elif msg.get("text"):
await dg.send(msg["text"])
except (WebSocketDisconnect, Exception):
pass
finally:
dg_task.cancel()
try:
await dg.send(_json.dumps({"type": "CloseStream"}))
await _asyncio.sleep(0.3)
except Exception:
pass
except Exception:
try:
await websocket.close()
except Exception:
pass
finally:
_log_stt_usage(_stt_ip, lang, model, _rl_time.time() - _stt_start)
if __name__ == "__main__":
import uvicorn
uvicorn.run(app, host="127.0.0.1", port=8000)