import asyncio
import json
import logging
import os
import time
import uuid
from pathlib import Path
from typing import Optional
logging.basicConfig(
level=logging.INFO,
format="%(asctime)s [%(levelname)s] %(name)s: %(message)s",
)
logger = logging.getLogger(__name__)
from dotenv import load_dotenv
# 自动加载项目目录下的 .env
load_dotenv(Path(__file__).parent / ".env")
from fastapi import FastAPI, HTTPException, Header, Request
from fastapi.middleware.cors import CORSMiddleware
from fastapi.responses import StreamingResponse
from pydantic import BaseModel
from account_manager import AccountManager
from config import Config, load_config
app = FastAPI(title="DS2API Browser")
app.add_middleware(
CORSMiddleware,
allow_origins=["*"],
allow_credentials=True,
allow_methods=["*"],
allow_headers=["*"],
)
config: Config = load_config()
manager = AccountManager(max_inflight=2)
class Message(BaseModel):
role: str
content: str
class ChatCompletionRequest(BaseModel):
model: str
messages: list[Message]
stream: bool = False
temperature: Optional[float] = None
max_tokens: Optional[int] = None
def verify_api_key(authorization: Optional[str] = Header(None)) -> str:
if not authorization:
raise HTTPException(status_code=401, detail="Missing API key")
token = authorization.replace("Bearer ", "").strip()
if token not in config.api_keys:
raise HTTPException(status_code=401, detail="Invalid API key")
return token
@app.get("/v1/models")
async def list_models(authorization: str = Header(...)):
verify_api_key(authorization)
return {
"data": [
{"id": "deepseek-flash", "object": "model", "created": int(time.time()), "owned_by": "deepseek"},
{"id": "deepseek-pro", "object": "model", "created": int(time.time()), "owned_by": "deepseek"},
],
"object": "list",
}
@app.get("/v1/models/{model_id}")
async def get_model(model_id: str, authorization: str = Header(...)):
verify_api_key(authorization)
models = {
"deepseek-flash": {"id": "deepseek-flash", "object": "model", "created": int(time.time()), "owned_by": "deepseek"},
"deepseek-pro": {"id": "deepseek-pro", "object": "model", "created": int(time.time()), "owned_by": "deepseek"},
}
if model_id in models:
return models[model_id]
raise HTTPException(status_code=404, detail="Model not found")
@app.post("/v1/chat/completions")
async def chat_completions(
request: ChatCompletionRequest,
authorization: str = Header(...),
):
verify_api_key(authorization)
if not request.messages:
raise HTTPException(status_code=400, detail="No messages provided")
prompt = request.messages[-1].content
model = request.model
account = await manager.acquire()
try:
browser = await manager.get_or_create_browser_with_retry(account, headless=config.browser.headless)
if request.stream:
async def stream_with_cleanup():
chunk_id = f"chatcmpl-{uuid.uuid4().hex[:8]}"
try:
async for chunk in browser.stream_message(prompt, timeout=120, model=model):
data = {
"id": chunk_id,
"object": "chat.completion.chunk",
"created": int(time.time()),
"model": request.model,
"choices": [
{
"index": 0,
"delta": {"content": chunk},
"finish_reason": None,
}
],
}
yield f"data: {json.dumps(data)}\n\n"
final_data = {
"id": chunk_id,
"object": "chat.completion.chunk",
"created": int(time.time()),
"model": request.model,
"choices": [
{
"index": 0,
"delta": {},
"finish_reason": "stop",
}
],
}
yield f"data: {json.dumps(final_data)}\n\n"
yield "data: [DONE]\n\n"
except Exception as e:
yield f"data: {json.dumps({'error': {'message': str(e)}})}\n\n"
finally:
await manager.release(account)
return StreamingResponse(
stream_with_cleanup(),
media_type="text/event-stream",
)
response_text = await browser.send_message(prompt, timeout=120, model=model)
await manager.release(account)
# Token counts are estimated by word splitting; not exact tokenization
prompt_tokens = len(prompt.split())
completion_tokens = len(response_text.split())
return {
"id": f"chatcmpl-{uuid.uuid4().hex[:8]}",
"object": "chat.completion",
"created": int(time.time()),
"model": request.model,
"choices": [
{
"index": 0,
"message": {"role": "assistant", "content": response_text},
"finish_reason": "stop",
}
],
"usage": {
"prompt_tokens": prompt_tokens,
"completion_tokens": completion_tokens,
"total_tokens": prompt_tokens + completion_tokens,
},
}
except Exception as e:
await manager.mark_error(account)
logger.error("Chat completion error for model=%s: %s", request.model, e)
raise HTTPException(status_code=503, detail=str(e))
@app.get("/healthz")
async def healthz():
return {"status": "ok"}
@app.get("/readyz")
async def readyz():
stats = manager.get_stats()
return {
"status": "ok",
"accounts": {
"total": stats["total"],
"in_use": stats["in_use"],
"available": stats["available"],
"logged_in": stats["logged_in"],
"muted": stats["muted"],
"queue_size": stats["queue_size"],
},
}
@app.get("/admin/stats")
async def admin_stats(admin_key: str = Header(...)):
if admin_key != config.server.admin_key:
raise HTTPException(status_code=401, detail="Invalid admin key")
return manager.get_stats()
@app.post("/admin/accounts/import")
async def import_accounts(request: Request, admin_key: str = Header(...)):
if admin_key != config.server.admin_key:
raise HTTPException(status_code=401, detail="Invalid admin key")
body = await request.json()
accounts = body.get("accounts", [])
if not accounts:
raise HTTPException(status_code=400, detail="No accounts provided")
imported = 0
for acc in accounts:
email = acc.get("email")
password = acc.get("password")
name = acc.get("name", "")
proxy = acc.get("proxy")
if email and password:
manager.add_account(email, password, name, proxy)
imported += 1
return {"success": True, "imported": imported, "total": len(manager.accounts)}
@app.get("/admin/accounts")
async def list_accounts(admin_key: str = Header(...)):
if admin_key != config.server.admin_key:
raise HTTPException(status_code=401, detail="Invalid admin key")
accounts = []
for email, acc in manager.accounts.items():
accounts.append({
"email": email,
"name": acc.name,
"in_use": acc.in_use,
"logged_in": acc.logged_in,
"is_muted": acc.is_muted,
"muted_until": acc.muted_until,
"error_count": acc.error_count,
})
return {"accounts": accounts, "total": len(accounts)}
@app.get("/")
async def admin_panel():
from fastapi.responses import HTMLResponse
return HTMLResponse(content=ADMIN_HTML)
@app.on_event("startup")
async def startup():
for acc in config.accounts:
manager.add_account(
email=acc.email,
password=acc.password,
name=acc.name,
proxy=acc.proxy,
)
logger.info("Loaded %d accounts", len(config.accounts))
ADMIN_HTML = """
DS2API · 控制台
▸ DS2API
浏览器模式
账号 —
活跃 —
可用 —
在线 —
排队 —
接口测试
/v1/chat/completions
"""
def main():
import uvicorn
uvicorn.run(
app,
host=config.server.host,
port=config.server.port,
)
if __name__ == "__main__":
main()