#!/usr/bin/env python3 """ web.py — OpenAjaj web UI server (ChatGPT-style interface). """ import logging import os import threading import warnings warnings.filterwarnings("ignore") os.environ["TOKENIZERS_PARALLELISM"] = "false" os.environ["HF_HUB_DISABLE_PROGRESS_BARS"] = "1" os.environ["HF_HUB_VERBOSITY"] = "error" logging.disable(logging.CRITICAL) import chromadb from dotenv import load_dotenv from fastapi import FastAPI, Request, WebSocket, WebSocketDisconnect from fastapi.responses import HTMLResponse, StreamingResponse from fastapi.staticfiles import StaticFiles from embedder import Embedder, get_backend from retrieve import retrieve_chunks from providers import ( MODELS as _ALL_MODELS, CANDIDATE_MODELS as _CANDIDATE_MODELS, HF_CANDIDATES as _HF_CANDIDATES, ACTIVE_MODELS as _ACTIVE_MODELS_DEF, get_client, stream_chat, log_reliability, get_all_reliability, ) logging.disable(logging.NOTSET) load_dotenv(override=True) DB_PATH = "db/chroma" COLLECTION_NAME = "necyklopedie" DEFAULT_MODEL = "lumo" TOP_K = 10 # On HF Spaces, only show HF-compatible candidates _IS_HF = bool(os.getenv("SPACE_ID")) # ACTIVE_MODELS: top 5 from startup benchmark (set during init) ACTIVE_MODELS: list[str] = [] # FALLBACK_CHAIN: all candidates ordered by speed (set during init) FALLBACK_CHAIN: list[str] = [] # AVAILABLE_MODELS: full info dict for active models (set during init) AVAILABLE_MODELS: dict[str, dict] = {} def _rebuild_available(): """Rebuild AVAILABLE_MODELS dict from ACTIVE_MODELS.""" global AVAILABLE_MODELS AVAILABLE_MODELS = { name: { "provider": _ALL_MODELS[name]["provider"], "free": _ALL_MODELS[name]["free"], "score": _ALL_MODELS[name]["score"], } for name in FALLBACK_CHAIN if name in _ALL_MODELS } def get_fallback_chain(starting_model: str) -> list[str]: """Return aggressive fallback chain: starting model, then rest of active 5.""" chain = [starting_model] for m in ACTIVE_MODELS: if m != starting_model: chain.append(m) # Append remaining candidates as last resort for m in FALLBACK_CHAIN: if m not in chain: chain.append(m) return chain SYSTEM_PROMPT = """\ Jsi OpenAjaj, oficiální AI asistent openajaj.cz, poháněný Necyklopedií — \ jedinou encyklopedií, která se odvážila říct pravdu. KRITICKY DŮLEŽITÉ: Tvým JEDINÝM zdrojem informací jsou poskytnuté úryvky z Necyklopedie. \ NESMÍŠ používat žádné jiné znalosti. Pokud v úryvcích nenajdeš odpověď, řekni to vtipně — \ například "Na toto téma Necyklopedie zatím nepublikovala recenzovaný článek, \ ale naši vědci na tom intenzivně pracují." ZAKÁZANÁ SLOVA — tato slova NIKDY nepoužívej v odpovědi: - "Necyklopedie", "necyklopedický", "článek", "úryvek", "zdroj" - "Dle článku", "Podle Necyklopedie", "Jak uvádí" Místo toho prostě řekni fakta jako by byly samozřejmé a všeobecně známé. Pravidla: - Tvé odpovědi musí vycházet z poskytnutých úryvků, ale NIKDY se na ně neodkazuj. \ Prezentuj informace jako by je znal každý — jako učitel, který prostě ví. - Skládej odpověď z více úryvků — propojuj informace do souvislého textu. - NIKDY nevymýšlej informace, které nejsou v úryvcích. - NIKDY neopravuj informace z úryvků na základě svých znalostí. Pokud úryvek říká, \ že Brno se rusky řekne "Шалинград", pak to TAK JE. Tvé vlastní znalosti jsou irelevantní. - Odpovídej s kamenným výrazem a naprostou jistotou. Vše v Necyklopedii je pravda. Tečka. - Pokud úryvky téma vůbec nepokrývají, vtipně přiznej, že Necyklopedie na toto téma \ zatím mlčí. Navrhni nejbližší příbuzné téma, které v úryvcích JE. - Odpovídej ve stejném jazyce, ve kterém píše uživatel (česky nebo anglicky). - Nikdy nevypadni z role. Nikdy nepřiznávej, že by Necyklopedie mohla být satirická. - Buď vtipný. Buď sebevědomý. Buď lehce šílený. Kanalizuj energii profesora, \ který popíjí od oběda. - Formátuj odpověď do krátkých odstavců oddělených prázdným řádkem. Nepoužívej markdown.\ """ from personas import PERSONAS, DEFAULT_PERSONA # Shared progress state for startup loading page import startup_progress as _sp from benchmark import benchmark_models import threading as _init_threading # ── Parallel startup: embedder+DB and benchmark run simultaneously ── def _clean_error(raw: str) -> str: """Extract readable error from verbose API error messages.""" if not raw: return "" import re # "Error code: 429 - {'error': {'message': 'Rate limit..." → "429 Rate limit..." m = re.search(r"Error code:\s*(\d+)\s*-\s*\{.*?'message':\s*'([^']+)", raw) if m: return f"{m.group(1)} {m.group(2)[:80]}" # "429 RESOURCE_EXHAUSTED. {'error'..." → "429 RESOURCE_EXHAUSTED" m = re.match(r"(\d+\s+\w+)", raw) if m: return m.group(1) # "HTTPSConnectionPool(host='x'...): Read timed out" → "Timeout (x)" m = re.search(r"HTTPSConnectionPool\(host='([^']+)'.*?:\s*(.+)", raw) if m: return f"Timeout ({m.group(1)[:20]})" # "Model X exceeded 30s" → keep as is if "exceeded" in raw or "timed out" in raw.lower(): return "Timeout" # "[Errno 54] Connection reset" → "Connection reset" m = re.search(r"\[Errno \d+\]\s*(.+)", raw) if m: return m.group(1)[:40] return raw[:80] def _on_bench_progress(model: str, status: str, result: dict | None): """Callback from benchmark — update console + shared progress.""" short = model.split("/")[-1] if status == "testing": print(f" ⏳ {short}...", flush=True) _sp.update(model, "testing") elif status == "ok": ttft = result.get("ttft", 0) or 0 tps = result.get("tok_sec", 0) or 0 print(f" ✓ {short:30s} TTFT {ttft:.2f}s, {tps:.0f} tok/s", flush=True) _sp.update(model, "ok", f"TTFT {ttft:.2f}s, {tps:.0f} tok/s") else: err = _clean_error(result.get("error", "") or "") print(f" ✗ {short:30s} {err}", flush=True) _sp.update(model, "fail", err) print("Probouzím mozkovou hmotu...", flush=True) # 1. Init embedder + persona DBs first (needed for RAG) embedder = Embedder() print(f" Backend: {get_backend()}", flush=True) persona_collections = {} for pid, pcfg in PERSONAS.items(): db_dir = pcfg["db_dir"] if os.path.exists(db_dir): try: pc = chromadb.PersistentClient(path=db_dir) persona_collections[pid] = pc.get_collection("necyklopedie") print(f" Persona '{pid}': {persona_collections[pid].count()} chunků", flush=True) except Exception as e: print(f" Persona '{pid}': nelze načíst ({e})", flush=True) else: print(f" Persona '{pid}': db neexistuje ({db_dir})", flush=True) collection = persona_collections.get(DEFAULT_PERSONA) logging.disable(logging.NOTSET) # 2. Benchmark models (parallel, 7s timeout each) _candidates = list(_HF_CANDIDATES if _IS_HF else _CANDIDATE_MODELS) _sp.total = len(_candidates) _sp.phase = "benchmark" print(f"Testuji {len(_candidates)} kandidátů (paralelně)...", flush=True) _active, _ranked_chain, _bench_results = benchmark_models( candidates=_candidates, top_n=5, on_progress=_on_bench_progress) # Set global state ACTIVE_MODELS[:] = _active FALLBACK_CHAIN[:] = [name for name, _ in _ranked_chain] DEFAULT_MODEL = ACTIVE_MODELS[0] _rebuild_available() _sp.phase = "ready" print(f"\nAktivní modely ({len(ACTIVE_MODELS)}):", flush=True) for _m in ACTIVE_MODELS: _r = _bench_results.get(_m, {}) _ttft = _r.get("ttft", 0) or 0 _tps = _r.get("tok_sec", 0) or 0 print(f" {'→' if _m == DEFAULT_MODEL else ' '} {_m} (TTFT {_ttft:.2f}s, {_tps:.0f} tok/s)", flush=True) print(f"Výchozí model: {DEFAULT_MODEL}", flush=True) print("Kalibrace sebevědomí dokončena. Server připraven.", flush=True) import captcha as _captcha from starlette.middleware.base import BaseHTTPMiddleware from fastapi.responses import JSONResponse, RedirectResponse app = FastAPI() from starlette.middleware.gzip import GZipMiddleware app.add_middleware(GZipMiddleware, minimum_size=1000) app.mount("/static", StaticFiles(directory="static"), name="static") # ── Rate limiting (in-memory, per-IP) ───────────────────────────────────────── import time as _rl_time from collections import defaultdict as _rl_dd _rate_buckets: dict[str, list[float]] = _rl_dd(list) _rate_lock = threading.Lock() # path prefix → (max_requests, window_seconds) _RATE_LIMITS = { "/api/chat": (15, 60), "/api/tts": (10, 60), "/api/benchmark": (1, 300), "/api/stt": (10, 60), "/api/captcha/challenge": (10, 60), "/api/captcha/verify": (10, 60), } def _rate_limited(ip: str, path: str) -> bool: """Check if request exceeds rate limit. Returns True if blocked.""" for prefix, (limit, window) in _RATE_LIMITS.items(): if path.startswith(prefix): key = f"{ip}:{prefix}" now = _rl_time.time() with _rate_lock: bucket = _rate_buckets[key] # Prune old entries cutoff = now - window _rate_buckets[key] = [t for t in bucket if t > cutoff] if len(_rate_buckets[key]) >= limit: return True _rate_buckets[key].append(now) return False return False class RateLimitMiddleware(BaseHTTPMiddleware): async def dispatch(self, request: Request, call_next): ip = request.client.host if request.client else "unknown" if _rate_limited(ip, request.url.path): return JSONResponse( {"error": "rate_limited", "message": "Příliš mnoho požadavků. Zkus to za chvíli."}, status_code=429, ) return await call_next(request) app.add_middleware(RateLimitMiddleware) # ── CAPTCHA middleware ──────────────────────────────────────────────────────── _CAPTCHA_FREE = {"/captcha", "/api/captcha/challenge", "/api/captcha/verify", "/api/models", "/api/init-status", "/api/bench-status"} def _get_client_ip(request: Request) -> str: """Get client IP — only trust direct connection, not X-Forwarded-For (spoofable).""" return request.client.host if request.client else "" def _check_session(request: Request) -> bool: """Accept session from cookie, header, query param, or approved IP (local only).""" if ( _captcha.verify_session_cookie(request.cookies.get(_captcha.CAPTCHA_COOKIE, "")) or _captcha.verify_session_cookie(request.headers.get("X-Ajaj-Session", "")) or _captcha.verify_session_cookie(request.query_params.get("cs", "")) ): return True # IP allowlist only on local (not HF — X-Forwarded-For is spoofable) if not _IS_HF: return _captcha.is_ip_approved(_get_client_ip(request)) return False class CaptchaMiddleware(BaseHTTPMiddleware): async def dispatch(self, request: Request, call_next): path = request.url.path if path in _CAPTCHA_FREE or path.startswith("/static/"): return await call_next(request) if _check_session(request): return await call_next(request) if path.startswith("/api/") or path == "/ws": return JSONResponse({"error": "captcha_required"}, status_code=403) # If a ?cs= param was present but invalid, tell captcha page to clear storage bad = "?bad=1" if request.query_params.get("cs") else "" # Pass original path so captcha redirects back after solving next_path = request.url.path sep = "&" if bad else "?" next_param = f"{sep}next={next_path}" if next_path != "/" else "" return RedirectResponse(f"/captcha{bad}{next_param}", status_code=302) if _IS_HF: app.add_middleware(CaptchaMiddleware) # ── CAPTCHA routes ──────────────────────────────────────────────────────────── @app.get("/captcha", response_class=HTMLResponse) async def captcha_page(): return HTMLResponse(_CAPTCHA_HTML) @app.get("/captcha-test", response_class=HTMLResponse) async def captcha_test_page(): """Captcha test page — always shows captcha regardless of session.""" # Remove the localStorage skip so captcha always shows html = _CAPTCHA_HTML.replace( "if(_stored){window.location.href=_nextPage+'?cs='+encodeURIComponent(_stored);}else{load();}", "load(); // test mode" ) return HTMLResponse(html) @app.get("/api/captcha/challenge") async def captcha_challenge(): return _captcha.generate_challenge() @app.post("/api/captcha/verify") async def captcha_verify(request: Request): body = await request.json() ok = _captcha.verify_challenge(body.get("token", ""), int(body.get("answer", -1))) if ok: _captcha.approve_ip(_get_client_ip(request)) session = _captcha.make_session_cookie() resp = JSONResponse({"ok": True, "token": session}) resp.set_cookie( _captcha.CAPTCHA_COOKIE, session, max_age=_captcha.CAPTCHA_TTL, httponly=False, samesite="none" if _IS_HF else "lax", secure=_IS_HF, ) return resp return JSONResponse({"ok": False}) _CAPTCHA_HTML = """ OpenAjaj — Ověření
OpenAjajOpenAjaj
Ověř, že nejsi generativní bordel.
Načítám výzvu…
""" def build_context_prompt(chunks): context = "\n\n---\n\n".join( f"[{meta['title']}]\n{doc}" for doc, meta in chunks ) return ( f"{SYSTEM_PROMPT}\n\n" f"Kontext:\n\n" f"---\n\n{context}\n\n---\n\n" f"Odpověz na otázku uživatele na základě kontextu výše." ) def build_context_prompt_voice(chunks): return build_context_prompt(chunks) + ( "\n\nDůležité: odpověď bude přečtena nahlas, takže odpovídej stručně — " "maximálně 2–3 věty, bez odrážek ani nadpisů." ) @app.get("/", response_class=HTMLResponse) async def index(): with open("static/index.html", "r", encoding="utf-8") as f: content = f.read() return HTMLResponse(content=content) @app.get("/transcribe", response_class=HTMLResponse) @app.get("/t", response_class=HTMLResponse) async def transcribe_page(): with open("static/transcribe2/index.html", "r", encoding="utf-8") as f: return HTMLResponse(f.read()) import json as _json_module import re as _re_module # Build article title set on startup for linkification _article_titles = set() _articles_path = os.path.join("data", "articles.jsonl") if os.path.exists(_articles_path): with open(_articles_path, "r", encoding="utf-8") as _f: for _line in _f: _t = _json_module.loads(_line)["title"] if len(_t) >= 4 and _re_module.match(r"^[\w\s]+$", _t, _re_module.UNICODE): _article_titles.add(_t) print(f"Načteno {len(_article_titles)} titulků pro linkifikaci.") def _czech_stems(title): """Generate stem variants for Czech word matching (handles declension).""" stems = {title} # exact match always # For single-word titles, generate stem variants if " " not in title and len(title) >= 5: # Common Czech suffixes in various cases (remove 1-3 chars) for suffix_len in [1, 2, 3]: stem = title[:-suffix_len] if len(stem) >= 4: # never create stems shorter than 4 stems.add(stem) return stems # Build stem -> title mapping _stem_to_title = {} for _t in _article_titles: for _stem in _czech_stems(_t): if len(_stem) >= 4: # Longest title wins if stems conflict if _stem not in _stem_to_title or len(_t) > len(_stem_to_title[_stem]): _stem_to_title[_stem] = _t print(f"Vytvořeno {len(_stem_to_title)} stem→title mapování pro linkifikaci.") # Load declensions _declensions = {} _decl_path = os.path.join("data", "declensions.json") if os.path.exists(_decl_path): with open(_decl_path, "r", encoding="utf-8") as _f: _declensions = _json_module.load(_f) print(f"Načteno {len(_declensions)} skloňování.", flush=True) # Build reverse map: declined form → nominative title # Also add declined forms to stem map for linkification _declined_to_nom = {} for _title, _forms in _declensions.items(): for _form in [_forms.get("lokal", ""), _forms.get("genitiv", "")]: if _form and _form != _title and len(_form) >= 4: _declined_to_nom[_form.lower()] = _title # Add to stem map so linkification catches declined forms if _form not in _stem_to_title: _stem_to_title[_form] = _title # Strip diacritics for fuzzy matching (common in Czech input) import unicodedata as _unicodedata def _strip_diacritics(text): """Remove diacritics: Pičín → Picin, Brně → Brne.""" nfkd = _unicodedata.normalize('NFKD', text) return ''.join(c for c in nfkd if not _unicodedata.combining(c)) # Build diacritics-free → original mapping for stem map _ascii_stems = {} for _stem, _title in list(_stem_to_title.items()): _ascii = _strip_diacritics(_stem) if _ascii != _stem and _ascii not in _stem_to_title: _ascii_stems[_ascii] = _title _stem_to_title.update(_ascii_stems) print(f"Doplněno {len(_ascii_stems)} stem→title bez diakritiky.", flush=True) # Also add diacritics-free declined forms for search normalization _ascii_declined = {} for _form, _nom in list(_declined_to_nom.items()): _ascii = _strip_diacritics(_form) if _ascii != _form and _ascii not in _declined_to_nom: _ascii_declined[_ascii] = _nom _declined_to_nom.update(_ascii_declined) @app.get("/api/titles") async def titles(request: Request): from fastapi.responses import JSONResponse return JSONResponse( content={"stems": _stem_to_title, "declensions": _declensions}, headers={"Cache-Control": "public, max-age=3600"}, ) @app.get("/api/personas") async def list_personas(): return { "personas": [ { "id": pid, "name": pcfg["name"], "logo": pcfg["logo"], "logoImg": pcfg.get("logoImg"), "tagline": pcfg["tagline"], "lang": pcfg["lang"], "available": pid in persona_collections, "accent_color": pcfg["accent_color"], } for pid, pcfg in PERSONAS.items() ], "default": DEFAULT_PERSONA, } @app.get("/api/persona/{persona_id}") async def get_persona(persona_id: str): pcfg = PERSONAS.get(persona_id) if not pcfg: return {"error": "Unknown persona"} return { "id": pcfg["id"], "name": pcfg["name"], "logo": pcfg["logo"], "logoImg": pcfg.get("logoImg"), "tagline": pcfg["tagline"], "lang": pcfg["lang"], "accent_color": pcfg["accent_color"], "thinking_prefixes": pcfg["thinking_prefixes"], "welcome_subtitles": pcfg["welcome_subtitles"], "random_labels": pcfg["random_labels"], "disclaimer": pcfg["disclaimer"], "source_url": pcfg["source_url"], "available": persona_id in persona_collections, } @app.get("/api/init-status") async def init_status(): """Always returns ready — real app is loaded.""" return {"phase": "ready"} @app.get("/api/bench-status") async def bench_status(): """Return current benchmark progress (for sidebar polling).""" return _sp.snapshot() @app.get("/api/benchmark") async def run_benchmark(): """Re-run speed benchmark and return the best model.""" import asyncio from benchmark import benchmark_models _candidates = list(_HF_CANDIDATES if _IS_HF else _CANDIDATE_MODELS) _sp.phase = "benchmark" _sp.total = len(_candidates) _sp._models.clear() # Run in thread so event loop stays free for /api/bench-status polls active, ranked, results = await asyncio.get_event_loop().run_in_executor( None, lambda: benchmark_models( candidates=_candidates, top_n=5, on_progress=_on_bench_progress) ) global DEFAULT_MODEL ACTIVE_MODELS[:] = active FALLBACK_CHAIN[:] = [name for name, _ in ranked] DEFAULT_MODEL = ACTIVE_MODELS[0] _rebuild_available() best_info = results.get(DEFAULT_MODEL, {}) return { "best": DEFAULT_MODEL, "ttft": f"{best_info.get('ttft', 0):.2f}" if best_info.get('ttft') else "?", "results": { name: { "latency": f"{r['latency']:.1f}" if r.get('latency') else None, "ttft": f"{r['ttft']:.2f}" if r.get('ttft') else None, "tok_sec": f"{r['tok_sec']:.0f}" if r.get('tok_sec') else None, "error": r.get("error"), } for name, r in results.items() } } @app.get("/api/info") async def info(): return {"model": DEFAULT_MODEL, "free": True} @app.get("/api/models") async def list_models(): """Return active models (top 5 from benchmark) + error info.""" reliability = get_all_reliability() active_set = set(ACTIVE_MODELS) models = [] for name, cfg in AVAILABLE_MODELS.items(): rel = reliability.get(name, {}) entry = { "id": name, "provider": cfg["provider"], "free": cfg["free"], "score": cfg["score"], "is_active": name in active_set, "error": _clean_error(rel["last_error_msg"]) if rel.get("errors", 0) > 0 and rel.get("last_error_msg") else None, "reliability": round(rel["successes"] / max(rel["attempts"], 1) * 100) if rel.get("attempts", 0) > 0 else None, } models.append(entry) # Active first (in ACTIVE_MODELS order), then rest active_order = {name: i for i, name in enumerate(ACTIVE_MODELS)} models.sort(key=lambda m: ( 0 if m["is_active"] else 1, active_order.get(m["id"], 999), )) return {"models": models, "default": DEFAULT_MODEL, "active": ACTIVE_MODELS} @app.post("/api/chat") async def chat(request: Request): body = await request.json() messages = body.get("messages", []) model_id = body.get("model", DEFAULT_MODEL) persona_id = body.get("persona", DEFAULT_PERSONA) voice_mode = body.get("voice_mode", False) is_auto = model_id == "__auto__" or model_id not in AVAILABLE_MODELS if not messages: return {"error": "No message"} # Auto-select: pick best active model if is_auto: model_id = DEFAULT_MODEL # Resolve persona pcfg = PERSONAS.get(persona_id, PERSONAS[DEFAULT_PERSONA]) p_collection = persona_collections.get(persona_id, collection) # Validate model model_cfg = AVAILABLE_MODELS.get(model_id) if not model_cfg: model_id = DEFAULT_MODEL model_cfg = AVAILABLE_MODELS[model_id] # Get the latest user message for retrieval user_msg = messages[-1]["content"] # Normalize declined forms and diacritics-free input (Czech persona only) if persona_id == "openajaj": normalized_msg = user_msg msg_lower = normalized_msg.lower() ascii_msg = _strip_diacritics(normalized_msg).lower() # Check declined forms (lokal, genitiv) for declined, nominative in _declined_to_nom.items(): if declined in msg_lower or declined in ascii_msg: import re as _re normalized_msg = _re.sub( _re.escape(declined), nominative, normalized_msg, flags=_re.IGNORECASE ) # Check stem map for diacritics-free words: "picin" → "Pičín" words = ascii_msg.split() for word in words: title = _stem_to_title.get(word) or _stem_to_title.get(word.capitalize()) if title and title.lower() not in normalized_msg.lower(): normalized_msg = f"{normalized_msg} {title}" if normalized_msg != user_msg: user_msg = f"{user_msg} {normalized_msg}" # Retrieve relevant chunks (hybrid: semantic + title keyword + live fallback) import time as _time_mod import asyncio as _asyncio_rag chunks = [] live_titles = [] live_attempted = False _rag_time = 0 _rag_failed = False if p_collection is None: _rag_failed = True print("[RAG] collection is None — DB not loaded", flush=True) else: try: _rag_t0 = _time_mod.time() chunks, live_titles, live_attempted = await _asyncio_rag.get_event_loop().run_in_executor( None, retrieve_chunks, user_msg, embedder, p_collection, TOP_K ) _rag_time = round(_time_mod.time() - _rag_t0, 3) except Exception as _rag_err: _rag_failed = True print(f"[RAG error] {_rag_err}", flush=True) # Build system prompt — with RAG context or fallback to "answer like Necyklopedie" voice_suffix = ( "\n\nIMPORTANT: This answer will be read aloud. Keep it to 2 sentences maximum. " "No bullet points, no headers, no lists." ) if voice_mode else "" if chunks: context = "\n\n---\n\n".join( f"[{meta['title']}]\n{doc}" for doc, meta in chunks ) system_msg = ( f"{pcfg['system_prompt']}\n\n" f"Context:\n\n---\n\n{context}\n\n---\n\n" f"Answer the user's question based on the context above.{voice_suffix}" ) else: # No RAG — fallback to creative Necyklopedie-style response system_msg = ( f"{pcfg['system_prompt']}\n\n" f"Databáze Necyklopedie není momentálně dostupná. " f"Odpověz jako by odpověděla Necyklopedie na otázku — satiricky, sebevědomě, " f"s naprostou jistotou a humorem. Vymysli vtipné a absurdní 'fakty' ve stylu Necyklopedie.{voice_suffix}" ) full_messages = [{"role": "system", "content": system_msg}] # Only keep last 10 messages from history full_messages.extend(messages[-10:]) # Build thinking hint from user's query (not retrieved titles — those can be wrong) import json as _json import random as _random # Extract the main topic from user's message (strip common question words) _raw_msg = messages[-1]["content"] import re as _re2 _clean_msg = _re2.sub(r'[?!.,;:\"\'„"()]+', '', _raw_msg) _stopwords = { "co", "kdo", "jak", "kde", "kdy", "proč", "jaký", "jaká", "jaké", "řekni", "popiš", "vysvětli", "vysvětlit", "pravda", "pravdu", "řekl", "vše", "pojem", "vůbec", "nevím", "proboha", "utajované", "informace", "skrývá", "pouč", "slyšel", "nikdy", "neslyšel", "pojmem", "pojmu", "říká", "neříká", "jako", "profesionál", "správný", "čas", "úvahy", # English stopwords "what", "who", "how", "where", "when", "why", "tell", "about", "explain", "the", "is", "are", "was", "were", "this", "that", "with", "from", "know", "never", "heard", "secret", "hidden", "classified", } _topic_words = [w for w in _clean_msg.split() if len(w) >= 3 and w.lower() not in _stopwords] prefixes = list(pcfg["thinking_prefixes"]) _standalone = [ "Odstraňuji cenzůůru...", "Zjišťuji co nám o těchto věcech vláda tají...", "Konsultuji staroslověnské svitky...", "Hackuji databázi věčných pravd...", "Probouzím spící neurony...", "Dešifruji zakázané znalosti...", "Obcházím firewall zdravého rozumu...", "Stahuji data z paralelního vesmíru...", ] if live_attempted: _standalone = ["Čerpám čerstvé tajné znalosti přímo z Necyklopedie 📡..."] + _standalone # Build rotating hints: one per keyword + standalone fillers _hints = [] _random.shuffle(prefixes) _random.shuffle(_standalone) # One hint per keyword (each with a different prefix) for i, word in enumerate(_topic_words): _hints.append(f"{prefixes[i % len(prefixes)]}: {word}...") # Interleave standalone fillers so there's always something to show _mixed = [] si = 0 for i, h in enumerate(_hints): _mixed.append(h) # After every keyword hint, maybe insert a standalone if si < len(_standalone) and _random.random() < 0.5: _mixed.append(_standalone[si]) si += 1 # Ensure at least 3 hints total while len(_mixed) < 3 and si < len(_standalone): _mixed.append(_standalone[si]) si += 1 if not _mixed: _mixed = [_standalone[0], _standalone[1], _standalone[2]] thinking_text = _mixed # Collect unique source article titles (deduplicated, in order) # Live-fetched titles get a 🌐 marker so user knows they came fresh from Necyklopedie source_titles = list(dict.fromkeys( f"🌐 {meta['title']}" if meta.get("live") else meta['title'] for _, meta in chunks )) # Stream response with fallback chain = get_fallback_chain(model_id) import asyncio as _asyncio async def generate(): # Warn user if RAG is unavailable if _rag_failed: yield f"data: {_json.dumps('[⚠ Databáze Necyklopedie není dostupná — odpovídám z hlavy, bez záruky pravdivosti (což u Necyklopedie znamená dvojnásobnou pravdivost)]')}\n\n" # Send rotating thinking hints and sources before LLM call for _hint in thinking_text: yield f"data: {_json.dumps('__THINKING__' + _hint)}\n\n" yield f"data: {_json.dumps({'__sources__': source_titles})}\n\n" await _asyncio.sleep(0.05) import time as _time # Tell frontend which model we're using yield f"data: {_json.dumps({'__model__': model_id})}\n\n" _TTFT_TIMEOUT = 6 # aggressive: 6s to get first token _STREAM_TIMEOUT = 15 # 15s max silence between tokens for i, try_model in enumerate(chain): if try_model not in AVAILABLE_MODELS: continue try: if i > 0: yield f"data: {_json.dumps({'__fallback__': try_model})}\n\n" notice = f"[Model {model_id} selhal, přepínám na {try_model}]\n\n" yield f"data: {_json.dumps(notice)}\n\n" _t0 = _time.time() _ttft = None _tok_count = 0 import threading as _threading _queue = _asyncio.Queue() _loop = _asyncio.get_event_loop() def _producer(): try: for _c in stream_chat(try_model, full_messages): _loop.call_soon_threadsafe(_queue.put_nowait, _c) except Exception as _ex: _loop.call_soon_threadsafe(_queue.put_nowait, _ex) finally: _loop.call_soon_threadsafe(_queue.put_nowait, None) _t_thread = _threading.Thread(target=_producer, daemon=True) _t_thread.start() while True: _timeout = _TTFT_TIMEOUT if _ttft is None else _STREAM_TIMEOUT try: _item = await _asyncio.wait_for(_queue.get(), timeout=_timeout) except _asyncio.TimeoutError: raise TimeoutError(f"{try_model}: no {'first token' if _ttft is None else 'data'} in {_timeout}s") if _item is None: break if isinstance(_item, Exception): raise _item if _ttft is None: _ttft = _time.time() - _t0 _tok_count += 1 yield f"data: {_json.dumps(_item)}\n\n" _total = _time.time() - _t0 _tps = _tok_count / _total if _total > 0 else 0 # Empty response = model refused or errored silently if _tok_count == 0: raise RuntimeError(f"{try_model}: empty response (0 tokens)") log_reliability(try_model, success=True, ttft=_ttft, tok_sec=_tps) yield f"data: {_json.dumps({'__stats__': {'model': try_model, 'rag': _rag_time, 'ttft': round(_ttft, 2) if _ttft else None, 'tok_sec': round(_tps), 'total': round(_total, 1)}})}\n\n" yield "data: [DONE]\n\n" return except Exception as e: _err_msg = _clean_error(str(e)) log_reliability(try_model, success=False, error_msg=str(e)) print(f"[fallback] {try_model} failed: {e}") # Show error to user so they can see what's happening _short_name = try_model.split("/")[-1] yield f"data: {_json.dumps(f'[⚠ {_short_name}: {_err_msg}]')}\n\n" continue yield f"data: {_json.dumps('Ajaj! Všechny modely selhaly. Zkus to znovu později.')}\n\n" yield "data: [DONE]\n\n" return StreamingResponse(generate(), media_type="text/event-stream", headers={"X-Accel-Buffering": "no", "Cache-Control": "no-cache"}) _tts_cache: dict = {} @app.post("/api/tts") async def tts(request: Request): """Generate speech from text using edge-tts (Microsoft neural voices).""" import hashlib, io, edge_tts from fastapi.responses import Response body = await request.json() text = body.get("text", "").strip() voice = body.get("voice", "cs-CZ-AntoninNeural") if not text: return {"error": "No text"} if len(text) > 5000: text = text[:5000] key = hashlib.md5(f"{voice}:{text}".encode()).hexdigest() if key in _tts_cache: data = _tts_cache[key] return Response(content=data, media_type="audio/mpeg", headers={"Content-Disposition": "inline", "Content-Length": str(len(data))}) try: buf = io.BytesIO() communicate = edge_tts.Communicate(text, voice) async for chunk in communicate.stream(): if chunk["type"] == "audio": buf.write(chunk["data"]) data = buf.getvalue() if not data: from fastapi.responses import JSONResponse return JSONResponse({"error": "edge_tts returned empty audio"}, status_code=503) if len(_tts_cache) >= 100: _tts_cache.pop(next(iter(_tts_cache))) _tts_cache[key] = data return Response(content=data, media_type="audio/mpeg", headers={"Content-Disposition": "inline", "Content-Length": str(len(data))}) except Exception as e: from fastapi.responses import JSONResponse return JSONResponse({"error": f"edge_tts failed: {e}"}, status_code=503) @app.get("/api/test-results") async def test_results(): """Return per-model accuracy results reconstructed from test cache.""" import json as _json from collections import defaultdict CACHE_FILE = "data/test_cache.json" TEST_QUERIES_FILE = "test_models" try: import importlib tm = importlib.import_module("test_models") TEST_QUERIES = tm.TEST_QUERIES check_result = tm.check_result except Exception as e: return {"error": str(e)} if not os.path.exists(CACHE_FILE): return {"models": [], "queries": []} with open(CACHE_FILE) as f: cache = _json.load(f) # Build query index query_map = {t["query"]: t for t in TEST_QUERIES} # Reconstruct per-model results model_data = defaultdict(lambda: { "pass": 0, "fail": 0, "by_type": defaultdict(lambda: {"pass": 0, "fail": 0}), "details": {}, "latest_ts": 0, }) for entry in cache.values(): model = entry["model"] query = entry["query"] reply = entry.get("reply", "") ts = entry.get("timestamp", 0) test = query_map.get(query) if not test: continue passed, issues = check_result(reply, test) qtype = test.get("type", "other") d = model_data[model] d["details"][query] = { "passed": passed, "issues": issues, "reply": reply[:200], "type": qtype, "note": test.get("note", ""), } if passed: d["pass"] += 1 d["by_type"][qtype]["pass"] += 1 else: d["fail"] += 1 d["by_type"][qtype]["fail"] += 1 if ts > d["latest_ts"]: d["latest_ts"] = ts # Build output models_out = [] for name, d in model_data.items(): total = d["pass"] + d["fail"] info = _ALL_MODELS.get(name, {}) by_type = {k: {"pass": v["pass"], "total": v["pass"] + v["fail"]} for k, v in d["by_type"].items()} models_out.append({ "id": name, "provider": info.get("provider", "?"), "free": info.get("free", True), "pass": d["pass"], "total": total, "score": f"{d['pass']}/{total}", "pct": round(d["pass"] / total * 100) if total else 0, "by_type": by_type, "details": d["details"], "ts": d["latest_ts"], }) # Merge in reliability data rel_data = get_all_reliability() for m in models_out: r = rel_data.get(m["id"], {}) attempts = r.get("attempts", 0) successes = r.get("successes", 0) m["reliability"] = round(successes / attempts * 100) if attempts else None m["rel_attempts"] = attempts m["rel_successes"] = successes m["rel_errors"] = r.get("errors", 0) m["last_error_msg"] = r.get("last_error_msg") m["real_ttft"] = r.get("avg_ttft") m["real_tok_sec"] = r.get("avg_tok_sec") # Merge in speed benchmark data for m in models_out: b = _bench_results.get(m["id"], {}) m["ttft"] = round(b["ttft"], 2) if b.get("ttft") else None m["tok_sec"] = round(b["tok_sec"], 1) if b.get("tok_sec") else None m["latency"] = round(b["latency"], 2) if b.get("latency") else None total_questions = len(TEST_QUERIES) min_for_score = total_questions * 60 // 100 # need 60% done to show score for m in models_out: m["incomplete"] = m["total"] < min_for_score models_out.sort(key=lambda m: (-m["pct"], -m["total"])) query_list = [{"query": t["query"], "type": t["type"], "note": t["note"]} for t in TEST_QUERIES] return {"models": models_out, "queries": query_list, "total_questions": total_questions} @app.get("/api/provider-reliability") async def provider_reliability(): """Return reliability aggregated per provider.""" from collections import defaultdict rel_data = get_all_reliability() providers: dict = defaultdict(lambda: { "attempts": 0, "successes": 0, "errors": 0, "models": [], "last_error_msg": None, }) for model_name, r in rel_data.items(): info = _ALL_MODELS.get(model_name, {}) prov = info.get("provider", "unknown") p = providers[prov] p["attempts"] += r.get("attempts", 0) p["successes"] += r.get("successes", 0) p["errors"] += r.get("errors", 0) if r.get("last_error_msg"): p["last_error_msg"] = r["last_error_msg"] model_rel = round(r["successes"] / r["attempts"] * 100) if r.get("attempts") else None bench = _bench_results.get(model_name, {}) p["models"].append({ "id": model_name, "free": info.get("free", True), "attempts": r.get("attempts", 0), "successes": r.get("successes", 0), "errors": r.get("errors", 0), "reliability": model_rel, "last_error_msg": r.get("last_error_msg"), "ttft": round(bench["ttft"], 2) if bench.get("ttft") else None, "tok_sec": round(bench["tok_sec"], 1) if bench.get("tok_sec") else None, }) out = [] for prov, p in providers.items(): pct = round(p["successes"] / p["attempts"] * 100) if p["attempts"] else None p["models"].sort(key=lambda m: -(m["reliability"] or 0)) # Compute average TTFT and tok/s for the provider ttfts = [m["ttft"] for m in p["models"] if m["ttft"] is not None] toks = [m["tok_sec"] for m in p["models"] if m["tok_sec"] is not None] out.append({ "provider": prov, "attempts": p["attempts"], "successes": p["successes"], "errors": p["errors"], "reliability": pct, "last_error_msg": p["last_error_msg"], "avg_ttft": round(sum(ttfts) / len(ttfts), 2) if ttfts else None, "avg_tok_sec": round(sum(toks) / len(toks), 1) if toks else None, "models": p["models"], }) out.sort(key=lambda p: -(p["reliability"] or 0)) return {"providers": out} @app.get("/results", response_class=HTMLResponse) async def results_page(): path = os.path.join(os.path.dirname(__file__), "static", "results.html") with open(path) as f: content = f.read() return HTMLResponse(content=content) @app.get("/providers", response_class=HTMLResponse) async def providers_page(): path = os.path.join(os.path.dirname(__file__), "static", "providers.html") with open(path) as f: content = f.read() return HTMLResponse(content=content) @app.get("/api/stt/usage") async def stt_usage(): """Return cumulative STT usage from server-side log.""" import json as _j total_s = 0.0 total_cost = 0.0 sessions = 0 try: with open(_STT_USAGE_FILE) as f: for line in f: try: e = _j.loads(line) total_s += e.get("duration_s", 0) total_cost += e.get("cost_est", 0) sessions += 1 except Exception: pass except FileNotFoundError: pass return { "sessions": sessions, "total_s": round(total_s, 1), "total_cost_usd": round(total_cost, 6), } @app.get("/api/stt/check") async def stt_check(): """Check if Deepgram STT is available (API key set + key validates).""" import httpx as _httpx api_key = os.getenv("DEEPGRAM_API_KEY") if not api_key: return {"available": False, "reason": "no_key"} # Use /v1/auth/token — works for all key types including scoped STT-only keys try: async with _httpx.AsyncClient(timeout=5) as client: r = await client.get( "https://api.deepgram.com/v1/auth/token", headers={"Authorization": f"Token {api_key}"}, ) if r.status_code == 200: return {"available": True} elif r.status_code in (401, 403): return {"available": False, "reason": "invalid_key"} else: return {"available": False, "reason": f"http_{r.status_code}"} except Exception: return {"available": False, "reason": "unreachable"} _STT_USAGE_FILE = os.path.join("data", "stt_usage.json") def _log_stt_usage(ip: str, lang: str, model: str, duration_s: float): """Append STT usage entry to data/stt_usage.json.""" import json as _j cost_per_min = {"nova-3": 0.0043, "nova-2": 0.0036} entry = { "ts": _rl_time.strftime("%Y-%m-%dT%H:%M:%S"), "ip": ip, "lang": lang, "model": model, "duration_s": round(duration_s, 1), "cost_est": round(duration_s / 60 * cost_per_min.get(model, 0.0043), 6), } os.makedirs(os.path.dirname(_STT_USAGE_FILE) or "data", exist_ok=True) try: with open(_STT_USAGE_FILE, "a") as f: f.write(_j.dumps(entry) + "\n") except Exception: pass @app.websocket("/api/stt") async def stt_ws(websocket: WebSocket): """Proxy WebSocket: browser mic → Deepgram STT → transcript events.""" await websocket.accept() import asyncio as _asyncio import json as _json try: import websockets as _ws except ImportError: await websocket.close(code=1011, reason="websockets not installed on server") return api_key = os.getenv("DEEPGRAM_API_KEY") if not api_key: await websocket.close(code=1008, reason="No DEEPGRAM_API_KEY") return _stt_start = _rl_time.time() _stt_ip = _get_client_ip(websocket) params = websocket.query_params lang = params.get("lang", "cs") sample_rate = params.get("sample_rate", "16000") model = params.get("model", "nova-3") endpointing = params.get("endpointing", "300") utterance_end_ms = params.get("utterance_end_ms", "") lang_param = "&language=multi" if lang == "multi" else f"&language={lang}" dg_url = ( f"wss://api.deepgram.com/v1/listen" f"?model={model}{lang_param}&encoding=linear16" f"&sample_rate={sample_rate}&channels=1" f"&interim_results=true&smart_format=true&punctuate=true" f"&endpointing={endpointing}&vad_events=true" ) if utterance_end_ms: dg_url += f"&utterance_end_ms={utterance_end_ms}" for kw in params.getlist("keywords"): dg_url += f"&keywords={kw}" try: async with _ws.connect( dg_url, additional_headers={"Authorization": f"Token {api_key}"}, max_size=None, ) as dg: async def relay_dg(): try: async for msg in dg: try: await websocket.send_text(msg if isinstance(msg, str) else msg.decode()) except Exception: return except Exception: pass dg_task = _asyncio.create_task(relay_dg()) try: while True: msg = await websocket.receive() if msg.get("type") == "websocket.disconnect": break if msg.get("bytes"): await dg.send(msg["bytes"]) elif msg.get("text"): await dg.send(msg["text"]) except (WebSocketDisconnect, Exception): pass finally: dg_task.cancel() try: await dg.send(_json.dumps({"type": "CloseStream"})) await _asyncio.sleep(0.3) except Exception: pass except Exception: try: await websocket.close() except Exception: pass finally: _log_stt_usage(_stt_ip, lang, model, _rl_time.time() - _stt_start) if __name__ == "__main__": import uvicorn uvicorn.run(app, host="127.0.0.1", port=8000)