Spaces:
Running
Running
| import os | |
| import logging | |
| from datetime import datetime | |
| from zoneinfo import ZoneInfo | |
| from functools import lru_cache | |
| import requests | |
| from requests.adapters import HTTPAdapter | |
| from urllib3.util.retry import Retry | |
| import gradio as gr | |
| from openai import OpenAI | |
| # ============================================================================== | |
| # 1) SETUP & KONFIGURATION | |
| # ============================================================================== | |
| logging.basicConfig(level=logging.INFO, format="%(asctime)s - %(levelname)s - %(message)s") | |
| HYPERBOLIC_API_KEY = os.environ.get("HYPERBOLIC_API_KEY") | |
| GOOGLE_API_KEY = os.environ.get("GOOGLE_API_KEY") | |
| SEARCH_ENGINE_ID = os.environ.get("GOOGLE_CX") | |
| ALLOWED_HF_USERS = {"dtometzki"} | |
| ALLOWED_HF_USERS_LOWER = {u.lower() for u in ALLOWED_HF_USERS} | |
| WEB_CONTEXT_MAX_CHARS = 800 | |
| # Client initialisieren (falls Key da ist) | |
| client = None | |
| if HYPERBOLIC_API_KEY: | |
| client = OpenAI(api_key=HYPERBOLIC_API_KEY, base_url="https://api.hyperbolic.xyz/v1") | |
| MODELS = { | |
| "Qwen/Qwen3-Next-80B-A3B-Instruct": {"max_tokens": 8192}, | |
| "meta-llama/Llama-3.3-70B-Instruct": {"max_tokens": 8192}, | |
| "deepseek-ai/DeepSeek-V3": {"max_tokens": 131072}, | |
| "openai/gpt-oss-120b": {"max_tokens": 8192}, | |
| } | |
| MODEL_CHOICES = list(MODELS.keys()) | |
| MAX_TOKENS_GLOBAL = max(v["max_tokens"] for v in MODELS.values()) | |
| PRICE_PER_1M_OUTPUT = { | |
| "Qwen/Qwen3-Next-80B-A3B-Instruct": 0.30, | |
| "meta-llama/Llama-3.3-70B-Instruct": 0.40, | |
| "deepseek-ai/DeepSeek-V3": 0.25, | |
| "openai/gpt-oss-120b": 0.30, | |
| } | |
| # ============================================================================== | |
| # 2) HELPERS | |
| # ============================================================================== | |
| def cost_from_completion_tokens(model: str, completion_tokens: int) -> float: | |
| p = float(PRICE_PER_1M_OUTPUT.get(model, 0.0)) | |
| return (float(completion_tokens) / 1_000_000.0) * p | |
| def _local_now(tz="Europe/Berlin") -> datetime: | |
| return datetime.now(ZoneInfo(tz)) | |
| def _truncate(text: str, max_chars: int) -> str: | |
| text = (text or "").strip() | |
| if len(text) <= max_chars: | |
| return text | |
| return text[: max_chars - 1].rstrip() + "โฆ" | |
| def _profile_username(profile) -> str: | |
| if profile is None: return "" | |
| return (getattr(profile, "username", None) or getattr(profile, "name", None) or "").strip() | |
| def _is_allowed(profile) -> bool: | |
| return _profile_username(profile).lower() in ALLOWED_HF_USERS_LOWER | |
| def clamp_tokens(model: str, max_tokens) -> int: | |
| model_max = int(MODELS.get(model, {}).get("max_tokens", 2048)) | |
| try: v = int(max_tokens) | |
| except: v = 2048 | |
| return max(1, min(v, model_max)) | |
| # --- WICHTIG: Clean Response ohne Blockieren --- | |
| def _clean_response(text: str) -> str: | |
| marker = "<|channel|>final<|message|>" | |
| # Wenn der Marker da ist -> alles davor abschneiden (sauber) | |
| if marker in text: | |
| return text.split(marker, 1)[-1] | |
| # Wenn der Marker NICHT da ist -> Text trotzdem anzeigen | |
| return text | |
| def content_to_text(content) -> str: | |
| if content is None: return "" | |
| if isinstance(content, str): return content | |
| if isinstance(content, list): | |
| return "\n".join([str(p.get("text", "") or p.get("content", "")) for p in content if isinstance(p, dict)]).strip() | |
| return str(content) | |
| def normalize_history_messages(history): | |
| history = history or [] | |
| out = [] | |
| for m in history: | |
| if isinstance(m, dict) and m.get("role") in ("user", "assistant", "system"): | |
| out.append({"role": m["role"], "content": content_to_text(m["content"])}) | |
| return out | |
| # ============================================================================== | |
| # 3) GOOGLE SEARCH | |
| # ============================================================================== | |
| def create_session() -> requests.Session: | |
| s = requests.Session() | |
| retries = Retry(total=3, backoff_factor=1, status_forcelist=[429, 500, 502, 503], allowed_methods=["GET"]) | |
| s.mount("https://", HTTPAdapter(max_retries=retries)) | |
| return s | |
| session = create_session() | |
| def search_web(query: str) -> str | None: | |
| # Sicherheit: Wenn Keys fehlen, direkt None | |
| if not GOOGLE_API_KEY or not SEARCH_ENGINE_ID or not query: | |
| return None | |
| try: | |
| logging.info(f"๐ Google Suche: {query}") | |
| res = session.get( | |
| "https://www.googleapis.com/customsearch/v1", | |
| params={"key": GOOGLE_API_KEY, "cx": SEARCH_ENGINE_ID, "q": query, "num": 3}, | |
| timeout=8, | |
| ) | |
| res.raise_for_status() | |
| items = res.json().get("items", []) | |
| if not items: return None | |
| lines = [] | |
| for i in items: | |
| lines.append(f"- {i.get('title', '')}\n {i.get('snippet', '')}\n {i.get('link', '')}") | |
| return "\n".join(lines) | |
| except Exception as e: | |
| logging.error(f"Search Fail: {e}") | |
| return None | |
| # ============================================================================== | |
| # 4) CHAT STREAM LOGIK | |
| # ============================================================================== | |
| def add_user_message(msg, history, profile: gr.OAuthProfile | None = None): | |
| history = normalize_history_messages(history) | |
| if not _is_allowed(profile): | |
| history.append({"role": "assistant", "content": "๐ Nicht autorisiert."}) | |
| return "", history, "" | |
| msg = (msg or "").strip() | |
| if msg: | |
| history.append({"role": "user", "content": msg}) | |
| return "", history, "" | |
| def chat_stream( | |
| history, model, system_prompt, max_tokens, temp, top_p, use_search, | |
| profile: gr.OAuthProfile | None = None, | |
| ): | |
| history = normalize_history_messages(history) | |
| usage_text = "" | |
| # 1. Auth Check | |
| if not _is_allowed(profile): | |
| history.append({"role": "assistant", "content": "๐ Nicht autorisiert."}) | |
| yield history, "๐" | |
| return | |
| # 2. Key Check (Kritisch) | |
| if not client: | |
| history.append({"role": "assistant", "content": "โ ๏ธ **Konfigurations-Fehler:** `HYPERBOLIC_API_KEY` fehlt in den Umgebungsvariablen."}) | |
| yield history, "โ Key fehlt" | |
| return | |
| if not history or history[-1]["role"] != "user": | |
| yield history, usage_text | |
| return | |
| user_text = history[-1]["content"] | |
| # 3. Web Search Check (Warnung statt Crash) | |
| context_add = "" | |
| if use_search: | |
| if not GOOGLE_API_KEY or not SEARCH_ENGINE_ID: | |
| history.append({"role": "assistant", "content": "โ ๏ธ Google Suche an, aber `GOOGLE_API_KEY` oder `GOOGLE_CX` fehlen. Mache ohne Suche weiter..."}) | |
| else: | |
| search_res = search_web(user_text) | |
| if search_res: | |
| now = _local_now() | |
| short_res = _truncate(search_res, WEB_CONTEXT_MAX_CHARS) | |
| context_add = ( | |
| f"\n\n--- WEB INFO ({now:%Y-%m-%d %H:%M}) ---\n" | |
| f"{short_res}\n----------------------------------" | |
| ) | |
| # 4. Message Assembly | |
| messages = [] | |
| if system_prompt.strip(): | |
| messages.append({"role": "system", "content": system_prompt}) | |
| for m in history[:-1]: | |
| messages.append(m) | |
| messages.append({"role": "user", "content": user_text + context_add}) | |
| # Placeholder | |
| history.append({"role": "assistant", "content": ""}) | |
| yield history, usage_text | |
| # 5. API Call | |
| try: | |
| completion = client.chat.completions.create( | |
| model=model, | |
| messages=messages, | |
| max_tokens=clamp_tokens(model, max_tokens), | |
| temperature=float(temp), | |
| top_p=float(top_p), | |
| stream=True, | |
| stream_options={"include_usage": True}, | |
| ) | |
| full_response = "" | |
| completion_tokens = 0 | |
| for chunk in completion: | |
| # Text Content sicher extrahieren | |
| delta = "" | |
| if hasattr(chunk, "choices") and chunk.choices and len(chunk.choices) > 0: | |
| delta = chunk.choices[0].delta.content or "" | |
| if delta: | |
| full_response += delta | |
| # Hier der Fix: Wir zeigen immer Text an, damit nichts hรคngt. | |
| # Wenn der Clean-Marker kommt, springt der Text um auf "sauber". | |
| clean_text = _clean_response(full_response) | |
| history[-1]["content"] = clean_text | |
| yield history, usage_text | |
| # Usage Stats | |
| if hasattr(chunk, "usage") and chunk.usage: | |
| completion_tokens = chunk.usage.completion_tokens or 0 | |
| # Finish Reason Check (Safety gegen NoneType Fehler) | |
| if hasattr(chunk, "choices") and chunk.choices and len(chunk.choices) > 0: | |
| finish = getattr(chunk.choices[0], "finish_reason", None) | |
| if finish in ["stop", "length"]: | |
| break | |
| # Final Costs | |
| if completion_tokens > 0: | |
| cost = cost_from_completion_tokens(model, completion_tokens) | |
| usage_text = f"Tokens: {completion_tokens} | Kosten: ${cost:.5f}" | |
| yield history, usage_text | |
| except Exception as e: | |
| history[-1]["content"] += f"\n\nโ ๏ธ **API Fehler:** {str(e)}" | |
| yield history, "โ Fehler" | |
| # ============================================================================== | |
| # 5) UI LAYOUT | |
| # ============================================================================== | |
| def update_tokens_ui(model): | |
| val = int(MODELS.get(model, {}).get("max_tokens", 2048)) | |
| return gr.update(maximum=val, value=min(2048, val)) | |
| with gr.Blocks(title="Hyperbolic Chat", fill_height=True) as demo: | |
| gr.Markdown("## ๐ Hyperbolic Chat (Env Vars โข Allowlist: dtometzki)") | |
| with gr.Row(): | |
| with gr.Column(scale=4): | |
| chatbot = gr.Chatbot(height=700) | |
| with gr.Row(): | |
| msg_input = gr.Textbox(placeholder="Eingabe...", show_label=False, scale=4) | |
| submit_btn = gr.Button("Senden", variant="primary", scale=1) | |
| clear_btn = gr.Button("๐๏ธ Verlauf leeren") | |
| with gr.Column(scale=1, variant="panel"): | |
| gr.LoginButton() | |
| model_dd = gr.Dropdown(MODEL_CHOICES, value=MODEL_CHOICES[0], label="Modell") | |
| use_search_chk = gr.Checkbox(label="๐ Google Suche nutzen", value=False) | |
| gr.Markdown("---") | |
| system_txt = gr.Textbox("Du bist ein hilfreicher Assistent.", label="System Prompt", lines=2) | |
| tokens_sld = gr.Slider(1, MAX_TOKENS_GLOBAL, value=2048, label="Max Tokens") | |
| temp_sld = gr.Slider(0.0, 2.0, value=0.7, label="Temperature") | |
| top_p_sld = gr.Slider(0.1, 1.0, value=0.95, label="Top-P") | |
| gr.Markdown("---") | |
| usage_md = gr.Markdown("Kosten: -") | |
| # Event Wiring | |
| params = [chatbot, model_dd, system_txt, tokens_sld, temp_sld, top_p_sld, use_search_chk] | |
| msg_input.submit(add_user_message, [msg_input, chatbot], [msg_input, chatbot], queue=False).then( | |
| chat_stream, params, [chatbot, usage_md], queue=True | |
| ) | |
| submit_btn.click(add_user_message, [msg_input, chatbot], [msg_input, chatbot], queue=False).then( | |
| chat_stream, params, [chatbot, usage_md], queue=True | |
| ) | |
| model_dd.change(update_tokens_ui, model_dd, tokens_sld) | |
| clear_btn.click(lambda: ([], ""), None, [chatbot, usage_md]) | |
| def check_keys_startup(): | |
| print("\n" + "="*40) | |
| print("๐ STARTUP CHECK:") | |
| if HYPERBOLIC_API_KEY: print("โ HYPERBOLIC_API_KEY gefunden.") | |
| else: print("โ HYPERBOLIC_API_KEY fehlt! Chat wird Fehler zeigen.") | |
| if GOOGLE_API_KEY and SEARCH_ENGINE_ID: print("โ Google Search Keys gefunden.") | |
| else: print("โ ๏ธ Google Search Keys fehlen (Suche wird ignoriert).") | |
| print("="*40 + "\n") | |
| check_keys_startup() | |
| if __name__ == "__main__": | |
| os.environ["GRADIO_SSR_MODE"] = "False" | |
| demo.queue().launch(server_name="0.0.0.0", server_port=7860, share=False) |