Spaces:
Running
Running
API-based serving (api.1street.ai), English-only UI; identical UI to Darwin-9B-Opus space
013d2bd verified | """ | |
| 𧬠Darwin-9B-NEG β API Serving (OpenAI-compatible) | |
| No local GPU. Streams from the VIDRAFT inference API (api.1street.ai). | |
| Custom frontend (index.html) preserved exactly β only the backend is API-based. | |
| """ | |
| import sys | |
| print(f"[BOOT] Python {sys.version}", flush=True) | |
| import base64, os, re, json | |
| from typing import Generator, Optional | |
| import gradio as gr | |
| print(f"[BOOT] gradio {gr.__version__}", flush=True) | |
| import requests, httpx, uvicorn | |
| from fastapi import FastAPI, Request | |
| from fastapi.responses import HTMLResponse, RedirectResponse, JSONResponse | |
| from urllib.parse import urlencode | |
| import pathlib, secrets | |
| import urllib3 | |
| urllib3.disable_warnings(urllib3.exceptions.InsecureRequestWarning) | |
| # ββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| # 1. MODEL / API CONFIG | |
| # ββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| API_BASE = os.getenv("VIDRAFT_API_BASE", "https://api.1street.ai") | |
| API_MODEL = os.getenv("VIDRAFT_API_MODEL", "vidraft/darwin-9b-neg") | |
| API_KEY = os.getenv("VIDRAFT_API_KEY", "") # optional bearer; empty = no auth | |
| MODEL_ID = "FINAL-Bench/Darwin-9B-NEG" | |
| MODEL_NAME = "Darwin-9B-NEG" | |
| MODEL_CAP = { | |
| "arch": "Qwen3.5 Dense", "active": "9B", | |
| "ctx": "131K", "thinking": True, "vision": False, | |
| "max_tokens": 8192, "temp_max": 1.5, | |
| } | |
| PRESETS = { | |
| "general": "You are Darwin-9B-NEG, a highly capable reasoning model created by VIDRAFT via Negentropy distillation. Think step by step for complex questions.", | |
| "code": "You are an expert software engineer. Write clean, efficient, well-commented code. Explain your approach before writing. Use modern best practices.", | |
| "math": "You are a world-class mathematician. Break problems step-by-step. Show full working. Use LaTeX where helpful.", | |
| "creative": "You are a brilliant creative writer. Be imaginative, vivid, and engaging. Adapt tone and style to the request.", | |
| "translate": "You are a professional translator. Provide accurate, natural-sounding translations with cultural context.", | |
| "research": "You are a rigorous research analyst. Provide structured, well-reasoned analysis. Identify assumptions and acknowledge uncertainty.", | |
| } | |
| print(f"[API] base={API_BASE} model={API_MODEL} auth={'yes' if API_KEY else 'no'}", flush=True) | |
| # ββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| # 2. THINKING MODE HELPERS (unchanged β drives the reasoning-chain UI) | |
| # ββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| def parse_think_blocks(text: str) -> tuple[str, str]: | |
| m = re.search(r"<think>(.*?)</think>\s*", text, re.DOTALL) | |
| return (m.group(1).strip(), text[m.end():].strip()) if m else ("", text) | |
| def _is_thinking_line(line: str) -> bool: | |
| l = line.strip() | |
| if not l: | |
| return True | |
| think_starts = [ | |
| "The user", "the user", "This is", "this is", "I should", "I need to", | |
| "Let me", "let me", "My task", "my task", "I'll ", "I will", | |
| "Since ", "since ", "Now,", "now,", "So,", "so,", "First,", "first,", | |
| "Okay", "okay", "Alright", "Hmm", "Wait", "Actually", | |
| "The question", "the question", "The input", "the input", | |
| "The request", "the request", "The prompt", "the prompt", | |
| "Thinking Process", "Thinking process", "**Thinking", | |
| "Step ", "step ", "Approach:", "Analysis:", "Reasoning:", | |
| "1. **", "2. **", "3. **", "4. **", "5. **", | |
| ] | |
| for s in think_starts: | |
| if l.startswith(s): | |
| return True | |
| if l.startswith(("- ", "* ", "β ")) and any(c.isascii() and c.isalpha() for c in l[:20]): | |
| if not any(ord(c) > 0x1100 for c in l[:30]): | |
| return True | |
| return False | |
| def _split_thinking_answer(raw: str) -> tuple: | |
| lines = raw.split("\n") | |
| answer_start = -1 | |
| for i, line in enumerate(lines): | |
| if not _is_thinking_line(line): | |
| if any(ord(c) > 0x1100 for c in line.strip()[:10]): | |
| answer_start = i | |
| break | |
| if i > 2 and not _is_thinking_line(line): | |
| if all(not lines[j].strip() for j in range(max(0, i - 2), i)): | |
| answer_start = i | |
| break | |
| if answer_start > 0: | |
| return "\n".join(lines[:answer_start]).strip(), "\n".join(lines[answer_start:]).strip() | |
| return "", raw | |
| def format_response(raw: str) -> str: | |
| chain, answer = parse_think_blocks(raw) | |
| if chain: | |
| return ( | |
| "<details>\n<summary>π§ Reasoning Chain β click to expand</summary>\n\n" | |
| f"{chain}\n\n</details>\n\n{answer}" | |
| ) | |
| if "<think>" in raw and "</think>" not in raw: | |
| think_len = len(raw) - raw.index("<think>") - 7 | |
| return f"π§ Reasoning... ({think_len} chars)" | |
| first_line = raw.strip().split("\n")[0] if raw.strip() else "" | |
| if _is_thinking_line(first_line) and len(raw) > 20: | |
| thinking, answer = _split_thinking_answer(raw) | |
| if thinking and answer: | |
| return ( | |
| f"<details>\n<summary>π§ Reasoning Chain ({len(thinking)} chars)</summary>\n\n" | |
| f"{thinking}\n\n</details>\n\n{answer}" | |
| ) | |
| elif thinking and not answer: | |
| return f"π§ Reasoning... ({len(raw)} chars)" | |
| return raw | |
| # ββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| # 3. GENERATION β streamed from the VIDRAFT OpenAI-compatible API | |
| # ββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| def generate_reply( | |
| message: str, | |
| history: list, | |
| thinking_mode: str, | |
| image_input, | |
| system_prompt: str, | |
| max_new_tokens: int, | |
| temperature: float, | |
| top_p: float, | |
| ) -> Generator[str, None, None]: | |
| max_new_tokens = min(int(max_new_tokens), MODEL_CAP["max_tokens"]) | |
| temperature = min(float(temperature), MODEL_CAP["temp_max"]) | |
| # ββ Build the message list ββ | |
| messages: list[dict] = [] | |
| if system_prompt.strip(): | |
| messages.append({"role": "system", "content": system_prompt.strip()}) | |
| for turn in history: | |
| if isinstance(turn, dict): | |
| role = turn.get("role", "") | |
| raw = turn.get("content") or "" | |
| text = (" ".join(p.get("text", "") for p in raw | |
| if isinstance(p, dict) and p.get("type") == "text") | |
| if isinstance(raw, list) else str(raw)) | |
| if role == "user": | |
| messages.append({"role": "user", "content": text}) | |
| elif role == "assistant": | |
| _, clean = parse_think_blocks(text) | |
| messages.append({"role": "assistant", "content": clean}) | |
| else: | |
| try: | |
| u, a = (turn[0] or None), (turn[1] if len(turn) > 1 else None) | |
| except (IndexError, TypeError): | |
| continue | |
| def _txt(v): | |
| if v is None: return None | |
| if isinstance(v, list): | |
| return " ".join(p.get("text", "") for p in v | |
| if isinstance(p, dict) and p.get("type") == "text") | |
| return str(v) | |
| ut, at = _txt(u), _txt(a) | |
| if ut: messages.append({"role": "user", "content": ut}) | |
| if at: | |
| _, clean = parse_think_blocks(at) | |
| messages.append({"role": "assistant", "content": clean}) | |
| messages.append({"role": "user", "content": message}) | |
| payload = { | |
| "model": API_MODEL, | |
| "messages": messages, | |
| "max_tokens": max_new_tokens, | |
| "temperature": max(float(temperature), 0.0), | |
| "top_p": float(top_p), | |
| "stream": True, | |
| "stream_options": {"include_usage": True}, | |
| } | |
| headers = {"Content-Type": "application/json"} | |
| if API_KEY: | |
| headers["Authorization"] = f"Bearer {API_KEY}" | |
| print(f"[GEN] -> {API_BASE} model={API_MODEL} max_new={max_new_tokens} temp={temperature}", flush=True) | |
| output = "" | |
| try: | |
| with httpx.Client(timeout=httpx.Timeout(300.0, connect=15.0)) as client: | |
| with client.stream("POST", f"{API_BASE}/v1/chat/completions", | |
| json=payload, headers=headers) as r: | |
| if r.status_code != 200: | |
| body = r.read().decode(errors="ignore")[:300] | |
| yield f"**β API error {r.status_code}:** `{body}`" | |
| return | |
| for line in r.iter_lines(): | |
| if not line: | |
| continue | |
| if line.startswith("data: "): | |
| line = line[6:] | |
| if line.strip() == "[DONE]": | |
| break | |
| try: | |
| chunk = json.loads(line) | |
| except Exception: | |
| continue | |
| choices = chunk.get("choices") or [] | |
| if choices: | |
| delta = choices[0].get("delta") or {} | |
| piece = delta.get("content") or "" | |
| if piece: | |
| output += piece | |
| yield format_response(output) | |
| except Exception as e: | |
| if output: | |
| yield format_response(output) | |
| else: | |
| yield f"**β Generation error:** `{e}`" | |
| return | |
| if output: | |
| print(f"[GEN] Done β {len(output)} chars", flush=True) | |
| yield format_response(output) | |
| else: | |
| yield "**β οΈ The model returned an empty response.** Please try again." | |
| # ββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| # 4. GRADIO BLOCKS (api_name="chat" β index.html calls /gradio_api/call/chat) | |
| # ββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| with gr.Blocks(title=MODEL_NAME) as gradio_demo: | |
| thinking_toggle = gr.Radio( | |
| choices=["β‘ Fast Mode (direct answer)", | |
| "π§ Thinking Mode (chain-of-thought reasoning)"], | |
| value="β‘ Fast Mode (direct answer)", | |
| visible=False, | |
| ) | |
| image_input = gr.Textbox(value="", visible=False) | |
| system_prompt = gr.Textbox(value=PRESETS["general"], visible=False) | |
| max_new_tokens = gr.Slider(minimum=64, maximum=8192, value=4096, visible=False) | |
| temperature = gr.Slider(minimum=0.0, maximum=1.5, value=0.6, visible=False) | |
| top_p = gr.Slider(minimum=0.1, maximum=1.0, value=0.9, visible=False) | |
| gr.ChatInterface( | |
| fn=generate_reply, | |
| api_name="chat", | |
| additional_inputs=[ | |
| thinking_toggle, image_input, | |
| system_prompt, max_new_tokens, temperature, top_p, | |
| ], | |
| ) | |
| # ββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| # 5. FASTAPI β index.html + OAuth + utility APIs (unchanged structure) | |
| # ββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| fapp = FastAPI() | |
| SESSIONS: dict[str, dict] = {} | |
| HTML = pathlib.Path(__file__).parent / "index.html" | |
| CLIENT_ID = os.getenv("OAUTH_CLIENT_ID", "") | |
| CLIENT_SECRET = os.getenv("OAUTH_CLIENT_SECRET", "") | |
| SPACE_HOST = os.getenv("SPACE_HOST", "localhost:7860") | |
| REDIRECT_URI = f"https://{SPACE_HOST}/login/callback" | |
| print(f"[OAuth] CLIENT_ID set: {bool(CLIENT_ID)}") | |
| print(f"[OAuth] SPACE_HOST: {SPACE_HOST}") | |
| HF_AUTH_URL = "https://huggingface.co/oauth/authorize" | |
| HF_TOKEN_URL = "https://huggingface.co/oauth/token" | |
| HF_USER_URL = "https://huggingface.co/oauth/userinfo" | |
| SCOPES = os.getenv("OAUTH_SCOPES", "openid profile") | |
| def _sid(req: Request) -> Optional[str]: | |
| return req.cookies.get("mc_session") | |
| def _user(req: Request) -> Optional[dict]: | |
| sid = _sid(req) | |
| return SESSIONS.get(sid) if sid else None | |
| async def root(request: Request): | |
| html = HTML.read_text(encoding="utf-8") if HTML.exists() else "<h2>index.html missing</h2>" | |
| return HTMLResponse(html) | |
| async def oauth_user(request: Request): | |
| u = _user(request) | |
| return JSONResponse(u) if u else JSONResponse({"logged_in": False}, status_code=401) | |
| async def oauth_login(request: Request): | |
| if not CLIENT_ID: | |
| return RedirectResponse("/?oauth_error=not_configured") | |
| state = secrets.token_urlsafe(16) | |
| params = {"response_type": "code", "client_id": CLIENT_ID, "redirect_uri": REDIRECT_URI, "scope": SCOPES, "state": state} | |
| return RedirectResponse(f"{HF_AUTH_URL}?{urlencode(params)}", status_code=302) | |
| async def oauth_callback(code: str = "", error: str = "", state: str = ""): | |
| if error or not code: | |
| return RedirectResponse("/?auth_error=1") | |
| basic = base64.b64encode(f"{CLIENT_ID}:{CLIENT_SECRET}".encode()).decode() | |
| async with httpx.AsyncClient() as client: | |
| tok = await client.post(HF_TOKEN_URL, data={"grant_type": "authorization_code", "code": code, "redirect_uri": REDIRECT_URI}, | |
| headers={"Accept": "application/json", "Authorization": f"Basic {basic}"}) | |
| if tok.status_code != 200: | |
| return RedirectResponse("/?auth_error=1") | |
| access_token = tok.json().get("access_token", "") | |
| if not access_token: | |
| return RedirectResponse("/?auth_error=1") | |
| uinfo = await client.get(HF_USER_URL, headers={"Authorization": f"Bearer {access_token}"}) | |
| if uinfo.status_code != 200: | |
| return RedirectResponse("/?auth_error=1") | |
| user = uinfo.json() | |
| sid = secrets.token_urlsafe(32) | |
| SESSIONS[sid] = { | |
| "logged_in": True, | |
| "username": user.get("preferred_username", user.get("name", "User")), | |
| "name": user.get("name", ""), | |
| "avatar": user.get("picture", ""), | |
| "profile": f"https://huggingface.co/{user.get('preferred_username', '')}", | |
| } | |
| resp = RedirectResponse("/") | |
| resp.set_cookie("mc_session", sid, httponly=True, samesite="lax", secure=True, max_age=60 * 60 * 24 * 7) | |
| return resp | |
| async def oauth_logout(request: Request): | |
| sid = _sid(request) | |
| if sid and sid in SESSIONS: del SESSIONS[sid] | |
| resp = RedirectResponse("/") | |
| resp.delete_cookie("mc_session") | |
| return resp | |
| async def health(): | |
| return {"status": "ok", "model": MODEL_ID, "serving": "api", "api_base": API_BASE} | |
| # ββ Web Search API (Brave) ββ | |
| BRAVE_API_KEY = os.getenv("BRAVE_API_KEY", "") | |
| async def api_search(request: Request): | |
| body = await request.json() | |
| query = body.get("query", "").strip() | |
| if not query: | |
| return JSONResponse({"error": "empty query"}, status_code=400) | |
| if not BRAVE_API_KEY: | |
| return JSONResponse({"error": "BRAVE_API_KEY not set"}, status_code=500) | |
| try: | |
| r = requests.get( | |
| "https://api.search.brave.com/res/v1/web/search", | |
| headers={"X-Subscription-Token": BRAVE_API_KEY, "Accept": "application/json"}, | |
| params={"q": query, "count": 5}, timeout=10, | |
| ) | |
| r.raise_for_status() | |
| results = r.json().get("web", {}).get("results", []) | |
| items = [{"title": it.get("title", ""), "desc": it.get("description", ""), "url": it.get("url", "")} for it in results[:5]] | |
| return JSONResponse({"results": items}) | |
| except Exception as e: | |
| return JSONResponse({"error": str(e)}, status_code=500) | |
| # ββ PDF Text Extraction ββ | |
| async def api_extract_pdf(request: Request): | |
| try: | |
| body = await request.json() | |
| b64 = body.get("data", "") | |
| if "," in b64: | |
| b64 = b64.split(",", 1)[1] | |
| pdf_bytes = base64.b64decode(b64) | |
| text = "" | |
| try: | |
| import fitz | |
| doc = fitz.open(stream=pdf_bytes, filetype="pdf") | |
| for page in doc: | |
| text += page.get_text() + "\n" | |
| except ImportError: | |
| content = pdf_bytes.decode("utf-8", errors="ignore") | |
| text = re.sub(r'[^\x20-\x7E\n\r]', '', content) | |
| text = text.strip()[:8000] | |
| return JSONResponse({"text": text, "chars": len(text)}) | |
| except Exception as e: | |
| return JSONResponse({"error": str(e)}, status_code=500) | |
| # ββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| # 6. MOUNT & RUN | |
| # ββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| app = gr.mount_gradio_app(fapp, gradio_demo, path="/gradio") | |
| if __name__ == "__main__": | |
| print(f"[BOOT] {MODEL_NAME} Β· API serving Β· Ready", flush=True) | |
| uvicorn.run(app, host="0.0.0.0", port=7860) | |