import base64 import io import json import os import re import tempfile from typing import Tuple, Optional, Dict, List import gradio as gr import httpx from PIL import Image from lzstring import LZString # ========================= # Configuration # ========================= NEBIUS_BASE_URL = "https://api.studio.nebius.com/v1/" # Vision models that are confirmed to work with Nebius DEFAULT_VISION_MODEL = "Qwen/Qwen2.5-VL-72B-Instruct" VISION_MODELS = [ DEFAULT_VISION_MODEL, "Qwen/Qwen2.5-VL-7B-Instruct", ] # Code generation models confirmed to work on Nebius (verified and tested) DEFAULT_CODE_MODEL = "Qwen/Qwen2.5-72B-Instruct" CODE_MODELS = [ # Qwen 2.5 Models (Latest generation - all verified working) "Qwen/Qwen2.5-72B-Instruct", "Qwen/Qwen2.5-Coder-32B-Instruct", "Qwen/Qwen2.5-32B-Instruct", "Qwen/Qwen2.5-14B-Instruct", "Qwen/Qwen2.5-7B-Instruct", "Qwen/Qwen2.5-3B-Instruct", "Qwen/Qwen2.5-1.5B-Instruct", "Qwen/Qwen2.5-0.5B-Instruct", # QwQ Model (Reasoning specialized) "Qwen/QwQ-32B-Preview", # DeepSeek V3 (Latest) "deepseek-ai/DeepSeek-V3", # DeepSeek R1 Distill Models (All working) "deepseek-ai/DeepSeek-R1-Distill-Qwen-32B", "deepseek-ai/DeepSeek-R1-Distill-Qwen-14B", "deepseek-ai/DeepSeek-R1-Distill-Qwen-7B", "deepseek-ai/DeepSeek-R1-Distill-Qwen-1.5B", ] # Model recommendations for different use cases MODEL_RECOMMENDATIONS = { "fast": [ "Qwen/Qwen2.5-0.5B-Instruct", "Qwen/Qwen2.5-1.5B-Instruct", "deepseek-ai/DeepSeek-R1-Distill-Qwen-1.5B", "Qwen/Qwen2.5-3B-Instruct", ], "balanced": [ "Qwen/Qwen2.5-14B-Instruct", "deepseek-ai/DeepSeek-R1-Distill-Qwen-14B", "Qwen/Qwen2.5-Coder-32B-Instruct", "deepseek-ai/DeepSeek-R1-Distill-Qwen-32B", ], "quality": [ "Qwen/Qwen2.5-72B-Instruct", "deepseek-ai/DeepSeek-V3", "Qwen/QwQ-32B-Preview", ], "code_specialized": [ "Qwen/Qwen2.5-Coder-32B-Instruct", "deepseek-ai/DeepSeek-V3", "Qwen/QwQ-32B-Preview", ] } # Timeouts and retries HTTP_TIMEOUTS = httpx.Timeout(connect=15.0, read=180.0, write=30.0, pool=60.0) HTTP_RETRIES = 3 # Default API key DEFAULT_NEBIUS_API_KEY = ( "eyJhbGciOiJIUzI1NiIsImtpZCI6IlV6SXJWd1h0dnprLVRvdzlLZWstc0M1akptWXBvX1VaVkxUZlpnMDRlOFUiLCJ0eXAiOiJKV1QifQ.eyJzdWIiOiJnb29nbGUtb2F1dGgyfDEwNTA1MTQzMDg2MDMwMzIxNDEwMiIsInNjb3BlIjoib3BlbmlkIG9mZmxpbmVfYWNjZXNzIiwiaXNzIjoiYXBpX2tleV9pc3N1ZXIiLCJhdWQiOlsiaHR0cHM6Ly9uZWJpdXMtaW5mZXJlbmNlLmV1LmF1dGgwLmNvbS9hcGkvdjIvIl0sImV4cCI6MTkwNjU5ODA0NCwidXVpZCI6ImNkOGFiMWZlLTIxN2QtNDJlMy04OWUwLWM1YTg4MjcwMGVhNyIsIm5hbWUiOiJodW5nZ2luZyIsImV4cGlyZXNfYXQiOiIyMDMwLTA2LTAyVDAyOjM0OjA0KzAwMDAifQ.MA52QuIiNruK7_lX688RXAEI2TkcCOjcf_02XrpnhI8" ) # ========================= # Helpers # ========================= def get_api_key(user_key: str = "") -> str: """ Resolve the Nebius API key from: 1) The provided user_key field 2) The NEBIUS_API_KEY environment variable 3) The built-in DEFAULT_NEBIUS_API_KEY """ return (user_key or "").strip() or os.getenv("NEBIUS_API_KEY", "").strip() or DEFAULT_NEBIUS_API_KEY def test_model_availability(model: str, api_key: str) -> bool: """ Test if a model is available by making a simple request. """ try: url = f"{NEBIUS_BASE_URL}chat/completions" payload = { "model": model, "messages": [{"role": "user", "content": "Hi"}], "max_tokens": 5, "temperature": 0.1, } headers = {"Authorization": f"Bearer {api_key}", "Content-Type": "application/json"} with httpx.Client(timeout=httpx.Timeout(10.0)) as client: resp = client.post(url, headers=headers, json=payload) return resp.status_code == 200 except: return False def call_chat_completions( model: str, messages: list, api_key: str, max_tokens: int = 2000, temperature: float = 0.7, retry_with_fallback: bool = True, ) -> str: """ Calls the Nebius OpenAI-compatible chat completions endpoint via HTTP. Returns the assistant message content string. Includes retries and fallback models for better reliability. """ if not api_key: raise ValueError("Nebius API key is required.") url = f"{NEBIUS_BASE_URL}chat/completions" payload = { "model": model, "messages": messages, "max_tokens": max_tokens, "temperature": temperature, } headers = {"Authorization": f"Bearer {api_key}", "Content-Type": "application/json"} # Try with the requested model first last_error = None for attempt in range(HTTP_RETRIES): try: transport = httpx.HTTPTransport(retries=1) with httpx.Client(timeout=HTTP_TIMEOUTS, transport=transport) as client: resp = client.post(url, headers=headers, json=payload) if resp.status_code == 404 or resp.status_code == 400: # Model not found or bad request if retry_with_fallback and attempt == 0: # Try a fallback model fallback_models = { "vision": ["Qwen/Qwen2.5-VL-7B-Instruct"], "code": ["Qwen/Qwen2.5-7B-Instruct", "deepseek-ai/DeepSeek-R1-Distill-Qwen-7B"], } # Detect model type and use appropriate fallback model_type = "vision" if any(v in model.lower() for v in ["vision", "vl", "pixtral"]) else "code" for fallback in fallback_models.get(model_type, []): payload["model"] = fallback try: resp = client.post(url, headers=headers, json=payload) if resp.status_code == 200: break except Exception: continue resp.raise_for_status() data = resp.json() choices = data.get("choices", []) if not choices: raise RuntimeError("No choices returned from the API.") content = choices[0].get("message", {}).get("content", "") if not content: raise RuntimeError("Empty content returned from the API.") return content except (httpx.ReadTimeout, httpx.TimeoutException) as e: last_error = e # Reduce token count for retry payload["max_tokens"] = max(500, int(payload["max_tokens"] * 0.7)) continue except Exception as e: last_error = e if attempt < HTTP_RETRIES - 1: continue break if last_error: raise last_error raise RuntimeError(f"Failed to get response from model {model}") def _strip_fenced_code(text: str) -> str: """ Removes code fences from a content block if present. """ s = text.strip() # Remove various code fence patterns patterns = [ (r'^```html\s*\n?', r'\n?```$'), (r'^```HTML\s*\n?', r'\n?```$'), (r'^```\s*\n?', r'\n?```$'), ] for start_pattern, end_pattern in patterns: if re.match(start_pattern, s, re.IGNORECASE): s = re.sub(start_pattern, '', s, flags=re.IGNORECASE) s = re.sub(end_pattern, '', s, flags=re.IGNORECASE) break return s.strip() def ensure_complete_html_with_css(html_code: str) -> str: """ Ensures the HTML code is complete with proper structure and inline CSS. Forces inclusion of styles within the HTML document. """ # Check if it's already a complete HTML document has_doctype = "" in html_code.upper() has_html_tag = "]*>(.*?)', html_code, re.IGNORECASE | re.DOTALL) if style_matches: existing_styles = '\n'.join(style_matches) # Remove style tags from body content body_content = re.sub(r'', '', html_code, flags=re.IGNORECASE | re.DOTALL) html_code = f"""