SeaWolf-AI's picture
v1: FastAPI OpenAI-compatible Darwin-35B-A3B-Opus API (INT4, Docker)
2893ee9 verified
#!/usr/bin/env python3
"""Darwin-35B-A3B-Opus FastAPI Server β€” HF Space (Docker SDK)
OpenAI-compatible chat completions endpoint with optional bearer auth.
INT4 quantization (default) fits 35B MoE into ~18GB β†’ runs on L4/A10G/L40S.
Environment variables:
MODEL_ID β€” HuggingFace model id (default: FINAL-Bench/Darwin-35B-A3B-Opus)
HF_TOKEN β€” HuggingFace token (for private/gated models)
API_KEYS β€” Comma-separated bearer keys (empty = public, no auth)
QUANT_MODE β€” int4 (default) | int8 | bf16
"""
import os
import re
import time
import json
import threading
import traceback
from typing import List, Optional, Union, Any, Dict
import torch
from transformers import (
AutoTokenizer,
AutoModelForCausalLM,
BitsAndBytesConfig,
TextIteratorStreamer,
)
from fastapi import FastAPI, HTTPException, Header, Depends
from fastapi.middleware.cors import CORSMiddleware
from fastapi.responses import StreamingResponse, HTMLResponse
from pydantic import BaseModel, Field
# === Configuration ===
MODEL_ID = os.environ.get('MODEL_ID', 'FINAL-Bench/Darwin-35B-A3B-Opus')
MODEL_NAME = MODEL_ID.split('/')[-1]
HF_TOKEN = os.environ.get('HF_TOKEN', '').strip() or None
API_KEYS = set(k.strip() for k in os.environ.get('API_KEYS', '').split(',') if k.strip())
QUANT_MODE = os.environ.get('QUANT_MODE', 'int4').lower()
SPECIAL_TOKEN_RE = re.compile(
r'<\|im_(?:start|end)\|>|<\|endoftext\|>|<\|startoftext\|>'
)
def log(msg: str) -> None:
print(f'[{time.strftime("%H:%M:%S")}] {msg}', flush=True)
def strip_special(text: str) -> str:
return SPECIAL_TOKEN_RE.sub('', text)
# === Globals ===
model = None
tok = None
inference_lock = threading.Lock()
# === Pydantic schemas (OpenAI-compatible) ===
class ChatMessage(BaseModel):
role: str
content: Union[str, List[Dict[str, Any]]]
class ChatCompletionRequest(BaseModel):
model: str = MODEL_NAME
messages: List[ChatMessage]
max_tokens: int = Field(default=1024, ge=1, le=8192)
temperature: float = Field(default=0.7, ge=0.0, le=2.0)
top_p: float = Field(default=0.95, ge=0.0, le=1.0)
n: int = Field(default=1, ge=1, le=4)
stream: bool = False
stop: Optional[Union[str, List[str]]] = None
seed: Optional[int] = None
repetition_penalty: Optional[float] = Field(default=None, ge=1.0, le=2.0)
def verify_api_key(authorization: Optional[str] = Header(None)) -> None:
if not API_KEYS:
return # public
if not authorization:
raise HTTPException(401, 'Missing Authorization header. Use: Authorization: Bearer YOUR_API_KEY')
if not authorization.lower().startswith('bearer '):
raise HTTPException(401, 'Invalid Authorization format. Use: Bearer YOUR_API_KEY')
token = authorization[7:].strip()
if token not in API_KEYS:
raise HTTPException(401, 'Invalid API key')
# === FastAPI ===
app = FastAPI(title=f'{MODEL_NAME} API', version='1.0')
app.add_middleware(
CORSMiddleware,
allow_origins=['*'],
allow_credentials=True,
allow_methods=['*'],
allow_headers=['*'],
)
@app.get('/health')
def health():
return {
'status': 'ok',
'model': MODEL_NAME,
'loaded': model is not None,
'quant_mode': QUANT_MODE,
'auth_required': len(API_KEYS) > 0,
'cuda': torch.cuda.is_available(),
'cuda_device_count': torch.cuda.device_count() if torch.cuda.is_available() else 0,
}
@app.get('/v1/models')
def list_models():
return {
'object': 'list',
'data': [{
'id': MODEL_NAME,
'object': 'model',
'created': int(time.time()),
'owned_by': 'FINAL-Bench',
}],
}
def _stream_generate(inputs, gen_kwargs):
"""Background thread + SSE generator for streaming responses."""
streamer = TextIteratorStreamer(
tok, skip_prompt=True, skip_special_tokens=False, timeout=600.0
)
gk = {**gen_kwargs, 'streamer': streamer}
def _run():
with inference_lock:
try:
with torch.no_grad():
model.generate(**inputs, **gk)
except Exception as e:
log(f'stream gen FAIL: {e}')
traceback.print_exc()
t = threading.Thread(target=_run, daemon=True)
t.start()
def event_stream():
cid = f'chatcmpl-{int(time.time()*1000)}'
first = {
'id': cid, 'object': 'chat.completion.chunk',
'created': int(time.time()), 'model': MODEL_NAME,
'choices': [{'index': 0, 'delta': {'role': 'assistant'}, 'finish_reason': None}],
}
yield f'data: {json.dumps(first)}\n\n'
for chunk_text in streamer:
if not chunk_text:
continue
cleaned = strip_special(chunk_text)
if not cleaned:
continue
delta = {
'id': cid, 'object': 'chat.completion.chunk',
'created': int(time.time()), 'model': MODEL_NAME,
'choices': [{'index': 0, 'delta': {'content': cleaned}, 'finish_reason': None}],
}
yield f'data: {json.dumps(delta)}\n\n'
last = {
'id': cid, 'object': 'chat.completion.chunk',
'created': int(time.time()), 'model': MODEL_NAME,
'choices': [{'index': 0, 'delta': {}, 'finish_reason': 'stop'}],
}
yield f'data: {json.dumps(last)}\n\n'
yield 'data: [DONE]\n\n'
return event_stream()
@app.post('/v1/chat/completions', dependencies=[Depends(verify_api_key)])
def chat_completions(req: ChatCompletionRequest):
if model is None:
raise HTTPException(503, 'Model still loading')
# Convert messages β€” flatten content if it's a list
msgs = []
for m in req.messages:
content = m.content
if isinstance(content, list):
# Take text-typed items only (no multimodal in v1)
parts = [it.get('text', '') for it in content if isinstance(it, dict) and it.get('type') == 'text']
content = '\n'.join(parts)
msgs.append({'role': m.role, 'content': content})
try:
prompt = tok.apply_chat_template(msgs, tokenize=False, add_generation_prompt=True)
except Exception as e:
raise HTTPException(400, f'chat_template error: {e}')
inputs = tok(prompt, return_tensors='pt')
input_device = next(model.parameters()).device
inputs = {k: v.to(input_device) for k, v in inputs.items()}
input_len = inputs['input_ids'].shape[1]
if req.seed is not None:
torch.manual_seed(req.seed)
do_sample = req.temperature > 0
gen_kwargs = dict(
max_new_tokens=req.max_tokens,
do_sample=do_sample,
temperature=req.temperature if do_sample else 1.0,
top_p=req.top_p,
pad_token_id=tok.eos_token_id,
)
if req.repetition_penalty and req.repetition_penalty > 1.0:
gen_kwargs['repetition_penalty'] = req.repetition_penalty
# Streaming branch
if req.stream:
log(f'STREAM start: in={input_len} max={req.max_tokens}')
return StreamingResponse(
_stream_generate(inputs, gen_kwargs),
media_type='text/event-stream',
headers={'Cache-Control': 'no-cache', 'X-Accel-Buffering': 'no'},
)
# Non-streaming
if req.n > 1:
gen_kwargs['num_return_sequences'] = req.n
with inference_lock:
t0 = time.time()
with torch.no_grad():
try:
outputs = model.generate(**inputs, **gen_kwargs)
except Exception as e:
log(f'generate FAIL: {e}')
traceback.print_exc()
raise HTTPException(500, f'generate error: {e}')
elapsed = time.time() - t0
choices = []
total_completion = 0
for i in range(req.n):
gen = outputs[i][input_len:]
text = tok.decode(gen, skip_special_tokens=True)
text = strip_special(text).strip()
if req.stop:
stops = [req.stop] if isinstance(req.stop, str) else req.stop
for s in stops:
idx = text.find(s)
if idx >= 0:
text = text[:idx]
ct = int(len(gen))
total_completion += ct
choices.append({
'index': i,
'message': {'role': 'assistant', 'content': text},
'finish_reason': 'stop' if ct < req.max_tokens else 'length',
})
log(f'chat_completions: in={input_len} gen={total_completion} n={req.n} {elapsed:.1f}s')
return {
'id': f'chatcmpl-{int(time.time()*1000)}',
'object': 'chat.completion',
'created': int(time.time()),
'model': MODEL_NAME,
'choices': choices,
'usage': {
'prompt_tokens': input_len,
'completion_tokens': total_completion,
'total_tokens': input_len + total_completion,
},
}
# === Landing page (HTML) ===
@app.get('/', response_class=HTMLResponse)
def root():
state = 'loaded' if model is not None else 'loading...'
auth_note = 'Bearer API key required' if API_KEYS else 'No auth (public)'
return f"""<!DOCTYPE html>
<html lang="en">
<head><meta charset="UTF-8"><meta name="viewport" content="width=device-width,initial-scale=1">
<title>{MODEL_NAME} API</title>
<style>
*{{margin:0;padding:0;box-sizing:border-box}}
body{{font-family:-apple-system,BlinkMacSystemFont,'Segoe UI',Roboto,sans-serif;max-width:900px;margin:40px auto;padding:0 24px;line-height:1.65;color:#1f2937;background:#f9fafb}}
h1{{color:#4338ca;font-size:32px;margin-bottom:8px}}
h2{{margin:32px 0 12px;color:#1e293b;border-bottom:2px solid #e5e7eb;padding-bottom:6px;font-size:20px}}
pre{{background:#1e293b;color:#e2e8f0;padding:16px 18px;border-radius:8px;overflow-x:auto;font-size:13px;line-height:1.55}}
code{{background:#eef2ff;color:#4338ca;padding:2px 7px;border-radius:4px;font-family:'JetBrains Mono',Consolas,monospace;font-size:0.93em}}
pre code{{background:transparent;color:inherit;padding:0}}
.badge{{display:inline-block;padding:4px 12px;background:#dbeafe;color:#1e40af;border-radius:12px;font-size:12px;margin-right:6px;font-weight:500}}
.status{{display:inline-block;padding:4px 12px;border-radius:12px;font-size:12px;font-weight:600}}
.status.ok{{background:#dcfce7;color:#166534}}.status.warn{{background:#fef3c7;color:#92400e}}
ul{{padding-left:24px;margin:10px 0}}li{{margin:6px 0}}
a{{color:#4338ca;text-decoration:none}}a:hover{{text-decoration:underline}}
.card{{background:white;border:1px solid #e5e7eb;border-radius:10px;padding:20px;margin:16px 0}}
.footer{{margin-top:50px;padding-top:20px;border-top:1px solid #e5e7eb;color:#6b7280;font-size:13px;text-align:center}}
</style></head>
<body>
<h1>🧬 {MODEL_NAME} API</h1>
<p>
<span class="badge">35B MoE</span>
<span class="badge">3B active</span>
<span class="badge">{QUANT_MODE.upper()}</span>
<span class="badge">OpenAI-compatible</span>
<span class="status {'ok' if model is not None else 'warn'}">{state}</span>
</p>
<p>Self-hosted FastAPI inference server for FINAL-Bench/Darwin-35B-A3B-Opus.<br/>
Auth: <strong>{auth_note}</strong></p>
<h2>πŸ”Œ Endpoints</h2>
<ul>
<li><code>GET /health</code> β€” health + load status</li>
<li><code>GET /v1/models</code> β€” list available models</li>
<li><code>POST /v1/chat/completions</code> β€” chat (OpenAI compat, supports streaming)</li>
</ul>
<h2>πŸ’» Example (curl)</h2>
<pre><code>curl https://final-bench-darwin-35b-a3b-opus-api.hf.space/v1/chat/completions \\
-H "Authorization: Bearer YOUR_API_KEY" \\
-H "Content-Type: application/json" \\
-d '{{"model":"{MODEL_NAME}","messages":[{{"role":"user","content":"Explain SN2 reaction"}}],"max_tokens":500}}'</code></pre>
<h2>🐍 Example (Python OpenAI SDK)</h2>
<pre><code>from openai import OpenAI
client = OpenAI(
api_key="YOUR_API_KEY",
base_url="https://final-bench-darwin-35b-a3b-opus-api.hf.space/v1",
)
resp = client.chat.completions.create(
model="{MODEL_NAME}",
messages=[{{"role": "user", "content": "What is GPQA?"}}],
max_tokens=300,
)
print(resp.choices[0].message.content)</code></pre>
<h2>🌊 Streaming</h2>
<pre><code>stream = client.chat.completions.create(
model="{MODEL_NAME}",
messages=[{{"role":"user","content":"Write a Python function"}}],
max_tokens=500,
stream=True,
)
for chunk in stream:
if chunk.choices[0].delta.content:
print(chunk.choices[0].delta.content, end="", flush=True)</code></pre>
<div class="card">
<h2 style="border:none;margin-top:0">πŸ“Š Health check</h2>
<pre><code>curl https://final-bench-darwin-35b-a3b-opus-api.hf.space/health</code></pre>
</div>
<div class="footer">
Powered by <strong>FINAL-Bench</strong> Β· Model: <a href="https://huggingface.co/{MODEL_ID}">{MODEL_ID}</a>
</div>
</body></html>"""
# === Model loading ===
def load_model():
global model, tok
log(f'Loading tokenizer from {MODEL_ID}...')
tok = AutoTokenizer.from_pretrained(
MODEL_ID, trust_remote_code=True, token=HF_TOKEN
)
log(f' vocab={tok.vocab_size}, type={type(tok).__name__}')
log(f'Loading model in {QUANT_MODE} mode...')
t0 = time.time()
kwargs: Dict[str, Any] = {
'trust_remote_code': True,
'token': HF_TOKEN,
'device_map': 'auto',
'low_cpu_mem_usage': True,
}
if QUANT_MODE == 'int8':
kwargs['quantization_config'] = BitsAndBytesConfig(load_in_8bit=True)
elif QUANT_MODE == 'int4':
kwargs['quantization_config'] = BitsAndBytesConfig(
load_in_4bit=True,
bnb_4bit_compute_dtype=torch.bfloat16,
bnb_4bit_quant_type='nf4',
bnb_4bit_use_double_quant=True,
)
else:
# bf16 full precision (requires ~72GB GPU)
pass
# Try new "dtype" arg first (transformers >=4.46), fall back to "torch_dtype"
try:
if QUANT_MODE not in ('int8', 'int4'):
kwargs['dtype'] = torch.bfloat16
model = AutoModelForCausalLM.from_pretrained(MODEL_ID, **kwargs)
except TypeError:
kwargs.pop('dtype', None)
if QUANT_MODE not in ('int8', 'int4'):
kwargs['torch_dtype'] = torch.bfloat16
model = AutoModelForCausalLM.from_pretrained(MODEL_ID, **kwargs)
model.eval()
log(f'Loaded in {(time.time()-t0)/60:.1f} min')
log(f' class: {type(model).__name__}')
log(f' total params: {sum(p.numel() for p in model.parameters())/1e9:.2f}B')
if torch.cuda.is_available():
for i in range(torch.cuda.device_count()):
free, total = torch.cuda.mem_get_info(i)
log(f' GPU{i}: {(total-free)/1e9:.1f}/{total/1e9:.0f} GB used')
log('=== Ready ===')
log(f'=== {MODEL_NAME} API Server starting ===')
log(f' MODEL_ID: {MODEL_ID}')
log(f' QUANT_MODE: {QUANT_MODE}')
log(f' API_KEYS: {len(API_KEYS)} configured (auth {"required" if API_KEYS else "DISABLED β€” public"})')
log(f' HF_TOKEN: {"set" if HF_TOKEN else "(none)"}')
# Launch model load in background thread (uvicorn starts immediately, /health works)
threading.Thread(target=load_model, daemon=True).start()