#!/usr/bin/env python3 """Darwin-35B-A3B-Opus FastAPI Server — HF Space (Docker SDK) OpenAI-compatible chat completions endpoint with optional bearer auth. INT4 quantization (default) fits 35B MoE into ~18GB → runs on L4/A10G/L40S. Environment variables: MODEL_ID — HuggingFace model id (default: FINAL-Bench/Darwin-35B-A3B-Opus) HF_TOKEN — HuggingFace token (for private/gated models) API_KEYS — Comma-separated bearer keys (empty = public, no auth) QUANT_MODE — int4 (default) | int8 | bf16 """ import os import re import time import json import threading import traceback from typing import List, Optional, Union, Any, Dict import torch from transformers import ( AutoTokenizer, AutoModelForCausalLM, BitsAndBytesConfig, TextIteratorStreamer, ) from fastapi import FastAPI, HTTPException, Header, Depends from fastapi.middleware.cors import CORSMiddleware from fastapi.responses import StreamingResponse, HTMLResponse from pydantic import BaseModel, Field # === Configuration === MODEL_ID = os.environ.get('MODEL_ID', 'FINAL-Bench/Darwin-35B-A3B-Opus') MODEL_NAME = MODEL_ID.split('/')[-1] HF_TOKEN = os.environ.get('HF_TOKEN', '').strip() or None API_KEYS = set(k.strip() for k in os.environ.get('API_KEYS', '').split(',') if k.strip()) QUANT_MODE = os.environ.get('QUANT_MODE', 'int4').lower() SPECIAL_TOKEN_RE = re.compile( r'<\|im_(?:start|end)\|>|<\|endoftext\|>|<\|startoftext\|>' ) def log(msg: str) -> None: print(f'[{time.strftime("%H:%M:%S")}] {msg}', flush=True) def strip_special(text: str) -> str: return SPECIAL_TOKEN_RE.sub('', text) # === Globals === model = None tok = None inference_lock = threading.Lock() # === Pydantic schemas (OpenAI-compatible) === class ChatMessage(BaseModel): role: str content: Union[str, List[Dict[str, Any]]] class ChatCompletionRequest(BaseModel): model: str = MODEL_NAME messages: List[ChatMessage] max_tokens: int = Field(default=1024, ge=1, le=8192) temperature: float = Field(default=0.7, ge=0.0, le=2.0) top_p: float = Field(default=0.95, ge=0.0, le=1.0) n: int = Field(default=1, ge=1, le=4) stream: bool = False stop: Optional[Union[str, List[str]]] = None seed: Optional[int] = None repetition_penalty: Optional[float] = Field(default=None, ge=1.0, le=2.0) def verify_api_key(authorization: Optional[str] = Header(None)) -> None: if not API_KEYS: return # public if not authorization: raise HTTPException(401, 'Missing Authorization header. Use: Authorization: Bearer YOUR_API_KEY') if not authorization.lower().startswith('bearer '): raise HTTPException(401, 'Invalid Authorization format. Use: Bearer YOUR_API_KEY') token = authorization[7:].strip() if token not in API_KEYS: raise HTTPException(401, 'Invalid API key') # === FastAPI === app = FastAPI(title=f'{MODEL_NAME} API', version='1.0') app.add_middleware( CORSMiddleware, allow_origins=['*'], allow_credentials=True, allow_methods=['*'], allow_headers=['*'], ) @app.get('/health') def health(): return { 'status': 'ok', 'model': MODEL_NAME, 'loaded': model is not None, 'quant_mode': QUANT_MODE, 'auth_required': len(API_KEYS) > 0, 'cuda': torch.cuda.is_available(), 'cuda_device_count': torch.cuda.device_count() if torch.cuda.is_available() else 0, } @app.get('/v1/models') def list_models(): return { 'object': 'list', 'data': [{ 'id': MODEL_NAME, 'object': 'model', 'created': int(time.time()), 'owned_by': 'FINAL-Bench', }], } def _stream_generate(inputs, gen_kwargs): """Background thread + SSE generator for streaming responses.""" streamer = TextIteratorStreamer( tok, skip_prompt=True, skip_special_tokens=False, timeout=600.0 ) gk = {**gen_kwargs, 'streamer': streamer} def _run(): with inference_lock: try: with torch.no_grad(): model.generate(**inputs, **gk) except Exception as e: log(f'stream gen FAIL: {e}') traceback.print_exc() t = threading.Thread(target=_run, daemon=True) t.start() def event_stream(): cid = f'chatcmpl-{int(time.time()*1000)}' first = { 'id': cid, 'object': 'chat.completion.chunk', 'created': int(time.time()), 'model': MODEL_NAME, 'choices': [{'index': 0, 'delta': {'role': 'assistant'}, 'finish_reason': None}], } yield f'data: {json.dumps(first)}\n\n' for chunk_text in streamer: if not chunk_text: continue cleaned = strip_special(chunk_text) if not cleaned: continue delta = { 'id': cid, 'object': 'chat.completion.chunk', 'created': int(time.time()), 'model': MODEL_NAME, 'choices': [{'index': 0, 'delta': {'content': cleaned}, 'finish_reason': None}], } yield f'data: {json.dumps(delta)}\n\n' last = { 'id': cid, 'object': 'chat.completion.chunk', 'created': int(time.time()), 'model': MODEL_NAME, 'choices': [{'index': 0, 'delta': {}, 'finish_reason': 'stop'}], } yield f'data: {json.dumps(last)}\n\n' yield 'data: [DONE]\n\n' return event_stream() @app.post('/v1/chat/completions', dependencies=[Depends(verify_api_key)]) def chat_completions(req: ChatCompletionRequest): if model is None: raise HTTPException(503, 'Model still loading') # Convert messages — flatten content if it's a list msgs = [] for m in req.messages: content = m.content if isinstance(content, list): # Take text-typed items only (no multimodal in v1) parts = [it.get('text', '') for it in content if isinstance(it, dict) and it.get('type') == 'text'] content = '\n'.join(parts) msgs.append({'role': m.role, 'content': content}) try: prompt = tok.apply_chat_template(msgs, tokenize=False, add_generation_prompt=True) except Exception as e: raise HTTPException(400, f'chat_template error: {e}') inputs = tok(prompt, return_tensors='pt') input_device = next(model.parameters()).device inputs = {k: v.to(input_device) for k, v in inputs.items()} input_len = inputs['input_ids'].shape[1] if req.seed is not None: torch.manual_seed(req.seed) do_sample = req.temperature > 0 gen_kwargs = dict( max_new_tokens=req.max_tokens, do_sample=do_sample, temperature=req.temperature if do_sample else 1.0, top_p=req.top_p, pad_token_id=tok.eos_token_id, ) if req.repetition_penalty and req.repetition_penalty > 1.0: gen_kwargs['repetition_penalty'] = req.repetition_penalty # Streaming branch if req.stream: log(f'STREAM start: in={input_len} max={req.max_tokens}') return StreamingResponse( _stream_generate(inputs, gen_kwargs), media_type='text/event-stream', headers={'Cache-Control': 'no-cache', 'X-Accel-Buffering': 'no'}, ) # Non-streaming if req.n > 1: gen_kwargs['num_return_sequences'] = req.n with inference_lock: t0 = time.time() with torch.no_grad(): try: outputs = model.generate(**inputs, **gen_kwargs) except Exception as e: log(f'generate FAIL: {e}') traceback.print_exc() raise HTTPException(500, f'generate error: {e}') elapsed = time.time() - t0 choices = [] total_completion = 0 for i in range(req.n): gen = outputs[i][input_len:] text = tok.decode(gen, skip_special_tokens=True) text = strip_special(text).strip() if req.stop: stops = [req.stop] if isinstance(req.stop, str) else req.stop for s in stops: idx = text.find(s) if idx >= 0: text = text[:idx] ct = int(len(gen)) total_completion += ct choices.append({ 'index': i, 'message': {'role': 'assistant', 'content': text}, 'finish_reason': 'stop' if ct < req.max_tokens else 'length', }) log(f'chat_completions: in={input_len} gen={total_completion} n={req.n} {elapsed:.1f}s') return { 'id': f'chatcmpl-{int(time.time()*1000)}', 'object': 'chat.completion', 'created': int(time.time()), 'model': MODEL_NAME, 'choices': choices, 'usage': { 'prompt_tokens': input_len, 'completion_tokens': total_completion, 'total_tokens': input_len + total_completion, }, } # === Landing page (HTML) === @app.get('/', response_class=HTMLResponse) def root(): state = 'loaded' if model is not None else 'loading...' auth_note = 'Bearer API key required' if API_KEYS else 'No auth (public)' return f"""
35B MoE 3B active {QUANT_MODE.upper()} OpenAI-compatible {state}
Self-hosted FastAPI inference server for FINAL-Bench/Darwin-35B-A3B-Opus.
Auth: {auth_note}
GET /health — health + load statusGET /v1/models — list available modelsPOST /v1/chat/completions — chat (OpenAI compat, supports streaming)curl https://final-bench-darwin-35b-a3b-opus-api.hf.space/v1/chat/completions \\
-H "Authorization: Bearer YOUR_API_KEY" \\
-H "Content-Type: application/json" \\
-d '{{"model":"{MODEL_NAME}","messages":[{{"role":"user","content":"Explain SN2 reaction"}}],"max_tokens":500}}'
from openai import OpenAI
client = OpenAI(
api_key="YOUR_API_KEY",
base_url="https://final-bench-darwin-35b-a3b-opus-api.hf.space/v1",
)
resp = client.chat.completions.create(
model="{MODEL_NAME}",
messages=[{{"role": "user", "content": "What is GPQA?"}}],
max_tokens=300,
)
print(resp.choices[0].message.content)
stream = client.chat.completions.create(
model="{MODEL_NAME}",
messages=[{{"role":"user","content":"Write a Python function"}}],
max_tokens=500,
stream=True,
)
for chunk in stream:
if chunk.choices[0].delta.content:
print(chunk.choices[0].delta.content, end="", flush=True)
curl https://final-bench-darwin-35b-a3b-opus-api.hf.space/health