import os import logging from datetime import datetime from zoneinfo import ZoneInfo from functools import lru_cache import requests from requests.adapters import HTTPAdapter from urllib3.util.retry import Retry import gradio as gr from openai import OpenAI # ============================================================================== # 1) SETUP & KONFIGURATION # ============================================================================== logging.basicConfig(level=logging.INFO, format="%(asctime)s - %(levelname)s - %(message)s") HYPERBOLIC_API_KEY = os.environ.get("HYPERBOLIC_API_KEY") GOOGLE_API_KEY = os.environ.get("GOOGLE_API_KEY") SEARCH_ENGINE_ID = os.environ.get("GOOGLE_CX") ALLOWED_HF_USERS = {"dtometzki"} ALLOWED_HF_USERS_LOWER = {u.lower() for u in ALLOWED_HF_USERS} WEB_CONTEXT_MAX_CHARS = 800 # Client initialisieren (falls Key da ist) client = None if HYPERBOLIC_API_KEY: client = OpenAI(api_key=HYPERBOLIC_API_KEY, base_url="https://api.hyperbolic.xyz/v1") MODELS = { "Qwen/Qwen3-Next-80B-A3B-Instruct": {"max_tokens": 8192}, "meta-llama/Llama-3.3-70B-Instruct": {"max_tokens": 8192}, "deepseek-ai/DeepSeek-V3": {"max_tokens": 131072}, "openai/gpt-oss-120b": {"max_tokens": 8192}, } MODEL_CHOICES = list(MODELS.keys()) MAX_TOKENS_GLOBAL = max(v["max_tokens"] for v in MODELS.values()) PRICE_PER_1M_OUTPUT = { "Qwen/Qwen3-Next-80B-A3B-Instruct": 0.30, "meta-llama/Llama-3.3-70B-Instruct": 0.40, "deepseek-ai/DeepSeek-V3": 0.25, "openai/gpt-oss-120b": 0.30, } # ============================================================================== # 2) HELPERS # ============================================================================== def cost_from_completion_tokens(model: str, completion_tokens: int) -> float: p = float(PRICE_PER_1M_OUTPUT.get(model, 0.0)) return (float(completion_tokens) / 1_000_000.0) * p def _local_now(tz="Europe/Berlin") -> datetime: return datetime.now(ZoneInfo(tz)) def _truncate(text: str, max_chars: int) -> str: text = (text or "").strip() if len(text) <= max_chars: return text return text[: max_chars - 1].rstrip() + "โ€ฆ" def _profile_username(profile) -> str: if profile is None: return "" return (getattr(profile, "username", None) or getattr(profile, "name", None) or "").strip() def _is_allowed(profile) -> bool: return _profile_username(profile).lower() in ALLOWED_HF_USERS_LOWER def clamp_tokens(model: str, max_tokens) -> int: model_max = int(MODELS.get(model, {}).get("max_tokens", 2048)) try: v = int(max_tokens) except: v = 2048 return max(1, min(v, model_max)) # --- WICHTIG: Clean Response ohne Blockieren --- def _clean_response(text: str) -> str: marker = "<|channel|>final<|message|>" # Wenn der Marker da ist -> alles davor abschneiden (sauber) if marker in text: return text.split(marker, 1)[-1] # Wenn der Marker NICHT da ist -> Text trotzdem anzeigen return text def content_to_text(content) -> str: if content is None: return "" if isinstance(content, str): return content if isinstance(content, list): return "\n".join([str(p.get("text", "") or p.get("content", "")) for p in content if isinstance(p, dict)]).strip() return str(content) def normalize_history_messages(history): history = history or [] out = [] for m in history: if isinstance(m, dict) and m.get("role") in ("user", "assistant", "system"): out.append({"role": m["role"], "content": content_to_text(m["content"])}) return out # ============================================================================== # 3) GOOGLE SEARCH # ============================================================================== def create_session() -> requests.Session: s = requests.Session() retries = Retry(total=3, backoff_factor=1, status_forcelist=[429, 500, 502, 503], allowed_methods=["GET"]) s.mount("https://", HTTPAdapter(max_retries=retries)) return s session = create_session() @lru_cache(maxsize=128) def search_web(query: str) -> str | None: # Sicherheit: Wenn Keys fehlen, direkt None if not GOOGLE_API_KEY or not SEARCH_ENGINE_ID or not query: return None try: logging.info(f"๐Ÿ” Google Suche: {query}") res = session.get( "https://www.googleapis.com/customsearch/v1", params={"key": GOOGLE_API_KEY, "cx": SEARCH_ENGINE_ID, "q": query, "num": 3}, timeout=8, ) res.raise_for_status() items = res.json().get("items", []) if not items: return None lines = [] for i in items: lines.append(f"- {i.get('title', '')}\n {i.get('snippet', '')}\n {i.get('link', '')}") return "\n".join(lines) except Exception as e: logging.error(f"Search Fail: {e}") return None # ============================================================================== # 4) CHAT STREAM LOGIK # ============================================================================== def add_user_message(msg, history, profile: gr.OAuthProfile | None = None): history = normalize_history_messages(history) if not _is_allowed(profile): history.append({"role": "assistant", "content": "๐Ÿ”’ Nicht autorisiert."}) return "", history, "" msg = (msg or "").strip() if msg: history.append({"role": "user", "content": msg}) return "", history, "" def chat_stream( history, model, system_prompt, max_tokens, temp, top_p, use_search, profile: gr.OAuthProfile | None = None, ): history = normalize_history_messages(history) usage_text = "" # 1. Auth Check if not _is_allowed(profile): history.append({"role": "assistant", "content": "๐Ÿ”’ Nicht autorisiert."}) yield history, "๐Ÿ”’" return # 2. Key Check (Kritisch) if not client: history.append({"role": "assistant", "content": "โš ๏ธ **Konfigurations-Fehler:** `HYPERBOLIC_API_KEY` fehlt in den Umgebungsvariablen."}) yield history, "โŒ Key fehlt" return if not history or history[-1]["role"] != "user": yield history, usage_text return user_text = history[-1]["content"] # 3. Web Search Check (Warnung statt Crash) context_add = "" if use_search: if not GOOGLE_API_KEY or not SEARCH_ENGINE_ID: history.append({"role": "assistant", "content": "โš ๏ธ Google Suche an, aber `GOOGLE_API_KEY` oder `GOOGLE_CX` fehlen. Mache ohne Suche weiter..."}) else: search_res = search_web(user_text) if search_res: now = _local_now() short_res = _truncate(search_res, WEB_CONTEXT_MAX_CHARS) context_add = ( f"\n\n--- WEB INFO ({now:%Y-%m-%d %H:%M}) ---\n" f"{short_res}\n----------------------------------" ) # 4. Message Assembly messages = [] if system_prompt.strip(): messages.append({"role": "system", "content": system_prompt}) for m in history[:-1]: messages.append(m) messages.append({"role": "user", "content": user_text + context_add}) # Placeholder history.append({"role": "assistant", "content": ""}) yield history, usage_text # 5. API Call try: completion = client.chat.completions.create( model=model, messages=messages, max_tokens=clamp_tokens(model, max_tokens), temperature=float(temp), top_p=float(top_p), stream=True, stream_options={"include_usage": True}, ) full_response = "" completion_tokens = 0 for chunk in completion: # Text Content sicher extrahieren delta = "" if hasattr(chunk, "choices") and chunk.choices and len(chunk.choices) > 0: delta = chunk.choices[0].delta.content or "" if delta: full_response += delta # Hier der Fix: Wir zeigen immer Text an, damit nichts hรคngt. # Wenn der Clean-Marker kommt, springt der Text um auf "sauber". clean_text = _clean_response(full_response) history[-1]["content"] = clean_text yield history, usage_text # Usage Stats if hasattr(chunk, "usage") and chunk.usage: completion_tokens = chunk.usage.completion_tokens or 0 # Finish Reason Check (Safety gegen NoneType Fehler) if hasattr(chunk, "choices") and chunk.choices and len(chunk.choices) > 0: finish = getattr(chunk.choices[0], "finish_reason", None) if finish in ["stop", "length"]: break # Final Costs if completion_tokens > 0: cost = cost_from_completion_tokens(model, completion_tokens) usage_text = f"Tokens: {completion_tokens} | Kosten: ${cost:.5f}" yield history, usage_text except Exception as e: history[-1]["content"] += f"\n\nโš ๏ธ **API Fehler:** {str(e)}" yield history, "โŒ Fehler" # ============================================================================== # 5) UI LAYOUT # ============================================================================== def update_tokens_ui(model): val = int(MODELS.get(model, {}).get("max_tokens", 2048)) return gr.update(maximum=val, value=min(2048, val)) with gr.Blocks(title="Hyperbolic Chat", fill_height=True) as demo: gr.Markdown("## ๐Ÿš€ Hyperbolic Chat (Env Vars โ€ข Allowlist: dtometzki)") with gr.Row(): with gr.Column(scale=4): chatbot = gr.Chatbot(height=700) with gr.Row(): msg_input = gr.Textbox(placeholder="Eingabe...", show_label=False, scale=4) submit_btn = gr.Button("Senden", variant="primary", scale=1) clear_btn = gr.Button("๐Ÿ—‘๏ธ Verlauf leeren") with gr.Column(scale=1, variant="panel"): gr.LoginButton() model_dd = gr.Dropdown(MODEL_CHOICES, value=MODEL_CHOICES[0], label="Modell") use_search_chk = gr.Checkbox(label="๐ŸŒ Google Suche nutzen", value=False) gr.Markdown("---") system_txt = gr.Textbox("Du bist ein hilfreicher Assistent.", label="System Prompt", lines=2) tokens_sld = gr.Slider(1, MAX_TOKENS_GLOBAL, value=2048, label="Max Tokens") temp_sld = gr.Slider(0.0, 2.0, value=0.7, label="Temperature") top_p_sld = gr.Slider(0.1, 1.0, value=0.95, label="Top-P") gr.Markdown("---") usage_md = gr.Markdown("Kosten: -") # Event Wiring params = [chatbot, model_dd, system_txt, tokens_sld, temp_sld, top_p_sld, use_search_chk] msg_input.submit(add_user_message, [msg_input, chatbot], [msg_input, chatbot], queue=False).then( chat_stream, params, [chatbot, usage_md], queue=True ) submit_btn.click(add_user_message, [msg_input, chatbot], [msg_input, chatbot], queue=False).then( chat_stream, params, [chatbot, usage_md], queue=True ) model_dd.change(update_tokens_ui, model_dd, tokens_sld) clear_btn.click(lambda: ([], ""), None, [chatbot, usage_md]) def check_keys_startup(): print("\n" + "="*40) print("๐Ÿ”Ž STARTUP CHECK:") if HYPERBOLIC_API_KEY: print("โœ… HYPERBOLIC_API_KEY gefunden.") else: print("โŒ HYPERBOLIC_API_KEY fehlt! Chat wird Fehler zeigen.") if GOOGLE_API_KEY and SEARCH_ENGINE_ID: print("โœ… Google Search Keys gefunden.") else: print("โš ๏ธ Google Search Keys fehlen (Suche wird ignoriert).") print("="*40 + "\n") check_keys_startup() if __name__ == "__main__": os.environ["GRADIO_SSR_MODE"] = "False" demo.queue().launch(server_name="0.0.0.0", server_port=7860, share=False)