#!/usr/bin/env python3 """Darwin-35B-A3B-Opus FastAPI Server — HF Space (Docker SDK) OpenAI-compatible chat completions endpoint with optional bearer auth. INT4 quantization (default) fits 35B MoE into ~18GB → runs on L4/A10G/L40S. Environment variables: MODEL_ID — HuggingFace model id (default: FINAL-Bench/Darwin-35B-A3B-Opus) HF_TOKEN — HuggingFace token (for private/gated models) API_KEYS — Comma-separated bearer keys (empty = public, no auth) QUANT_MODE — int4 (default) | int8 | bf16 """ import os import re import time import json import threading import traceback from typing import List, Optional, Union, Any, Dict import torch from transformers import ( AutoTokenizer, AutoModelForCausalLM, BitsAndBytesConfig, TextIteratorStreamer, ) from fastapi import FastAPI, HTTPException, Header, Depends from fastapi.middleware.cors import CORSMiddleware from fastapi.responses import StreamingResponse, HTMLResponse from pydantic import BaseModel, Field # === Configuration === MODEL_ID = os.environ.get('MODEL_ID', 'FINAL-Bench/Darwin-35B-A3B-Opus') MODEL_NAME = MODEL_ID.split('/')[-1] HF_TOKEN = os.environ.get('HF_TOKEN', '').strip() or None API_KEYS = set(k.strip() for k in os.environ.get('API_KEYS', '').split(',') if k.strip()) QUANT_MODE = os.environ.get('QUANT_MODE', 'int4').lower() SPECIAL_TOKEN_RE = re.compile( r'<\|im_(?:start|end)\|>|<\|endoftext\|>|<\|startoftext\|>' ) def log(msg: str) -> None: print(f'[{time.strftime("%H:%M:%S")}] {msg}', flush=True) def strip_special(text: str) -> str: return SPECIAL_TOKEN_RE.sub('', text) # === Globals === model = None tok = None inference_lock = threading.Lock() # === Pydantic schemas (OpenAI-compatible) === class ChatMessage(BaseModel): role: str content: Union[str, List[Dict[str, Any]]] class ChatCompletionRequest(BaseModel): model: str = MODEL_NAME messages: List[ChatMessage] max_tokens: int = Field(default=1024, ge=1, le=8192) temperature: float = Field(default=0.7, ge=0.0, le=2.0) top_p: float = Field(default=0.95, ge=0.0, le=1.0) n: int = Field(default=1, ge=1, le=4) stream: bool = False stop: Optional[Union[str, List[str]]] = None seed: Optional[int] = None repetition_penalty: Optional[float] = Field(default=None, ge=1.0, le=2.0) def verify_api_key(authorization: Optional[str] = Header(None)) -> None: if not API_KEYS: return # public if not authorization: raise HTTPException(401, 'Missing Authorization header. Use: Authorization: Bearer YOUR_API_KEY') if not authorization.lower().startswith('bearer '): raise HTTPException(401, 'Invalid Authorization format. Use: Bearer YOUR_API_KEY') token = authorization[7:].strip() if token not in API_KEYS: raise HTTPException(401, 'Invalid API key') # === FastAPI === app = FastAPI(title=f'{MODEL_NAME} API', version='1.0') app.add_middleware( CORSMiddleware, allow_origins=['*'], allow_credentials=True, allow_methods=['*'], allow_headers=['*'], ) @app.get('/health') def health(): return { 'status': 'ok', 'model': MODEL_NAME, 'loaded': model is not None, 'quant_mode': QUANT_MODE, 'auth_required': len(API_KEYS) > 0, 'cuda': torch.cuda.is_available(), 'cuda_device_count': torch.cuda.device_count() if torch.cuda.is_available() else 0, } @app.get('/v1/models') def list_models(): return { 'object': 'list', 'data': [{ 'id': MODEL_NAME, 'object': 'model', 'created': int(time.time()), 'owned_by': 'FINAL-Bench', }], } def _stream_generate(inputs, gen_kwargs): """Background thread + SSE generator for streaming responses.""" streamer = TextIteratorStreamer( tok, skip_prompt=True, skip_special_tokens=False, timeout=600.0 ) gk = {**gen_kwargs, 'streamer': streamer} def _run(): with inference_lock: try: with torch.no_grad(): model.generate(**inputs, **gk) except Exception as e: log(f'stream gen FAIL: {e}') traceback.print_exc() t = threading.Thread(target=_run, daemon=True) t.start() def event_stream(): cid = f'chatcmpl-{int(time.time()*1000)}' first = { 'id': cid, 'object': 'chat.completion.chunk', 'created': int(time.time()), 'model': MODEL_NAME, 'choices': [{'index': 0, 'delta': {'role': 'assistant'}, 'finish_reason': None}], } yield f'data: {json.dumps(first)}\n\n' for chunk_text in streamer: if not chunk_text: continue cleaned = strip_special(chunk_text) if not cleaned: continue delta = { 'id': cid, 'object': 'chat.completion.chunk', 'created': int(time.time()), 'model': MODEL_NAME, 'choices': [{'index': 0, 'delta': {'content': cleaned}, 'finish_reason': None}], } yield f'data: {json.dumps(delta)}\n\n' last = { 'id': cid, 'object': 'chat.completion.chunk', 'created': int(time.time()), 'model': MODEL_NAME, 'choices': [{'index': 0, 'delta': {}, 'finish_reason': 'stop'}], } yield f'data: {json.dumps(last)}\n\n' yield 'data: [DONE]\n\n' return event_stream() @app.post('/v1/chat/completions', dependencies=[Depends(verify_api_key)]) def chat_completions(req: ChatCompletionRequest): if model is None: raise HTTPException(503, 'Model still loading') # Convert messages — flatten content if it's a list msgs = [] for m in req.messages: content = m.content if isinstance(content, list): # Take text-typed items only (no multimodal in v1) parts = [it.get('text', '') for it in content if isinstance(it, dict) and it.get('type') == 'text'] content = '\n'.join(parts) msgs.append({'role': m.role, 'content': content}) try: prompt = tok.apply_chat_template(msgs, tokenize=False, add_generation_prompt=True) except Exception as e: raise HTTPException(400, f'chat_template error: {e}') inputs = tok(prompt, return_tensors='pt') input_device = next(model.parameters()).device inputs = {k: v.to(input_device) for k, v in inputs.items()} input_len = inputs['input_ids'].shape[1] if req.seed is not None: torch.manual_seed(req.seed) do_sample = req.temperature > 0 gen_kwargs = dict( max_new_tokens=req.max_tokens, do_sample=do_sample, temperature=req.temperature if do_sample else 1.0, top_p=req.top_p, pad_token_id=tok.eos_token_id, ) if req.repetition_penalty and req.repetition_penalty > 1.0: gen_kwargs['repetition_penalty'] = req.repetition_penalty # Streaming branch if req.stream: log(f'STREAM start: in={input_len} max={req.max_tokens}') return StreamingResponse( _stream_generate(inputs, gen_kwargs), media_type='text/event-stream', headers={'Cache-Control': 'no-cache', 'X-Accel-Buffering': 'no'}, ) # Non-streaming if req.n > 1: gen_kwargs['num_return_sequences'] = req.n with inference_lock: t0 = time.time() with torch.no_grad(): try: outputs = model.generate(**inputs, **gen_kwargs) except Exception as e: log(f'generate FAIL: {e}') traceback.print_exc() raise HTTPException(500, f'generate error: {e}') elapsed = time.time() - t0 choices = [] total_completion = 0 for i in range(req.n): gen = outputs[i][input_len:] text = tok.decode(gen, skip_special_tokens=True) text = strip_special(text).strip() if req.stop: stops = [req.stop] if isinstance(req.stop, str) else req.stop for s in stops: idx = text.find(s) if idx >= 0: text = text[:idx] ct = int(len(gen)) total_completion += ct choices.append({ 'index': i, 'message': {'role': 'assistant', 'content': text}, 'finish_reason': 'stop' if ct < req.max_tokens else 'length', }) log(f'chat_completions: in={input_len} gen={total_completion} n={req.n} {elapsed:.1f}s') return { 'id': f'chatcmpl-{int(time.time()*1000)}', 'object': 'chat.completion', 'created': int(time.time()), 'model': MODEL_NAME, 'choices': choices, 'usage': { 'prompt_tokens': input_len, 'completion_tokens': total_completion, 'total_tokens': input_len + total_completion, }, } # === Landing page (HTML) === @app.get('/', response_class=HTMLResponse) def root(): state = 'loaded' if model is not None else 'loading...' auth_note = 'Bearer API key required' if API_KEYS else 'No auth (public)' return f""" {MODEL_NAME} API

🧬 {MODEL_NAME} API

35B MoE 3B active {QUANT_MODE.upper()} OpenAI-compatible {state}

Self-hosted FastAPI inference server for FINAL-Bench/Darwin-35B-A3B-Opus.
Auth: {auth_note}

🔌 Endpoints

💻 Example (curl)

curl https://final-bench-darwin-35b-a3b-opus-api.hf.space/v1/chat/completions \\
  -H "Authorization: Bearer YOUR_API_KEY" \\
  -H "Content-Type: application/json" \\
  -d '{{"model":"{MODEL_NAME}","messages":[{{"role":"user","content":"Explain SN2 reaction"}}],"max_tokens":500}}'

🐍 Example (Python OpenAI SDK)

from openai import OpenAI
client = OpenAI(
    api_key="YOUR_API_KEY",
    base_url="https://final-bench-darwin-35b-a3b-opus-api.hf.space/v1",
)
resp = client.chat.completions.create(
    model="{MODEL_NAME}",
    messages=[{{"role": "user", "content": "What is GPQA?"}}],
    max_tokens=300,
)
print(resp.choices[0].message.content)

🌊 Streaming

stream = client.chat.completions.create(
    model="{MODEL_NAME}",
    messages=[{{"role":"user","content":"Write a Python function"}}],
    max_tokens=500,
    stream=True,
)
for chunk in stream:
    if chunk.choices[0].delta.content:
        print(chunk.choices[0].delta.content, end="", flush=True)

📊 Health check

curl https://final-bench-darwin-35b-a3b-opus-api.hf.space/health
""" # === Model loading === def load_model(): global model, tok log(f'Loading tokenizer from {MODEL_ID}...') tok = AutoTokenizer.from_pretrained( MODEL_ID, trust_remote_code=True, token=HF_TOKEN ) log(f' vocab={tok.vocab_size}, type={type(tok).__name__}') log(f'Loading model in {QUANT_MODE} mode...') t0 = time.time() kwargs: Dict[str, Any] = { 'trust_remote_code': True, 'token': HF_TOKEN, 'device_map': 'auto', 'low_cpu_mem_usage': True, } if QUANT_MODE == 'int8': kwargs['quantization_config'] = BitsAndBytesConfig(load_in_8bit=True) elif QUANT_MODE == 'int4': kwargs['quantization_config'] = BitsAndBytesConfig( load_in_4bit=True, bnb_4bit_compute_dtype=torch.bfloat16, bnb_4bit_quant_type='nf4', bnb_4bit_use_double_quant=True, ) else: # bf16 full precision (requires ~72GB GPU) pass # Try new "dtype" arg first (transformers >=4.46), fall back to "torch_dtype" try: if QUANT_MODE not in ('int8', 'int4'): kwargs['dtype'] = torch.bfloat16 model = AutoModelForCausalLM.from_pretrained(MODEL_ID, **kwargs) except TypeError: kwargs.pop('dtype', None) if QUANT_MODE not in ('int8', 'int4'): kwargs['torch_dtype'] = torch.bfloat16 model = AutoModelForCausalLM.from_pretrained(MODEL_ID, **kwargs) model.eval() log(f'Loaded in {(time.time()-t0)/60:.1f} min') log(f' class: {type(model).__name__}') log(f' total params: {sum(p.numel() for p in model.parameters())/1e9:.2f}B') if torch.cuda.is_available(): for i in range(torch.cuda.device_count()): free, total = torch.cuda.mem_get_info(i) log(f' GPU{i}: {(total-free)/1e9:.1f}/{total/1e9:.0f} GB used') log('=== Ready ===') log(f'=== {MODEL_NAME} API Server starting ===') log(f' MODEL_ID: {MODEL_ID}') log(f' QUANT_MODE: {QUANT_MODE}') log(f' API_KEYS: {len(API_KEYS)} configured (auth {"required" if API_KEYS else "DISABLED — public"})') log(f' HF_TOKEN: {"set" if HF_TOKEN else "(none)"}') # Launch model load in background thread (uvicorn starts immediately, /health works) threading.Thread(target=load_model, daemon=True).start()