import asyncio import json import logging import os import time import uuid from pathlib import Path from typing import Optional logging.basicConfig( level=logging.INFO, format="%(asctime)s [%(levelname)s] %(name)s: %(message)s", ) logger = logging.getLogger(__name__) from dotenv import load_dotenv # 自动加载项目目录下的 .env load_dotenv(Path(__file__).parent / ".env") from fastapi import FastAPI, HTTPException, Header, Request from fastapi.middleware.cors import CORSMiddleware from fastapi.responses import StreamingResponse from pydantic import BaseModel from account_manager import AccountManager from config import Config, load_config app = FastAPI(title="DS2API Browser") app.add_middleware( CORSMiddleware, allow_origins=["*"], allow_credentials=True, allow_methods=["*"], allow_headers=["*"], ) config: Config = load_config() manager = AccountManager(max_inflight=2) class Message(BaseModel): role: str content: str class ChatCompletionRequest(BaseModel): model: str messages: list[Message] stream: bool = False temperature: Optional[float] = None max_tokens: Optional[int] = None def verify_api_key(authorization: Optional[str] = Header(None)) -> str: if not authorization: raise HTTPException(status_code=401, detail="Missing API key") token = authorization.replace("Bearer ", "").strip() if token not in config.api_keys: raise HTTPException(status_code=401, detail="Invalid API key") return token @app.get("/v1/models") async def list_models(authorization: str = Header(...)): verify_api_key(authorization) return { "data": [ {"id": "deepseek-flash", "object": "model", "created": int(time.time()), "owned_by": "deepseek"}, {"id": "deepseek-pro", "object": "model", "created": int(time.time()), "owned_by": "deepseek"}, ], "object": "list", } @app.get("/v1/models/{model_id}") async def get_model(model_id: str, authorization: str = Header(...)): verify_api_key(authorization) models = { "deepseek-flash": {"id": "deepseek-flash", "object": "model", "created": int(time.time()), "owned_by": "deepseek"}, "deepseek-pro": {"id": "deepseek-pro", "object": "model", "created": int(time.time()), "owned_by": "deepseek"}, } if model_id in models: return models[model_id] raise HTTPException(status_code=404, detail="Model not found") @app.post("/v1/chat/completions") async def chat_completions( request: ChatCompletionRequest, authorization: str = Header(...), ): verify_api_key(authorization) if not request.messages: raise HTTPException(status_code=400, detail="No messages provided") prompt = request.messages[-1].content model = request.model account = await manager.acquire() try: browser = await manager.get_or_create_browser_with_retry(account, headless=config.browser.headless) if request.stream: async def stream_with_cleanup(): chunk_id = f"chatcmpl-{uuid.uuid4().hex[:8]}" try: async for chunk in browser.stream_message(prompt, timeout=120, model=model): data = { "id": chunk_id, "object": "chat.completion.chunk", "created": int(time.time()), "model": request.model, "choices": [ { "index": 0, "delta": {"content": chunk}, "finish_reason": None, } ], } yield f"data: {json.dumps(data)}\n\n" final_data = { "id": chunk_id, "object": "chat.completion.chunk", "created": int(time.time()), "model": request.model, "choices": [ { "index": 0, "delta": {}, "finish_reason": "stop", } ], } yield f"data: {json.dumps(final_data)}\n\n" yield "data: [DONE]\n\n" except Exception as e: yield f"data: {json.dumps({'error': {'message': str(e)}})}\n\n" finally: await manager.release(account) return StreamingResponse( stream_with_cleanup(), media_type="text/event-stream", ) response_text = await browser.send_message(prompt, timeout=120, model=model) await manager.release(account) # Token counts are estimated by word splitting; not exact tokenization prompt_tokens = len(prompt.split()) completion_tokens = len(response_text.split()) return { "id": f"chatcmpl-{uuid.uuid4().hex[:8]}", "object": "chat.completion", "created": int(time.time()), "model": request.model, "choices": [ { "index": 0, "message": {"role": "assistant", "content": response_text}, "finish_reason": "stop", } ], "usage": { "prompt_tokens": prompt_tokens, "completion_tokens": completion_tokens, "total_tokens": prompt_tokens + completion_tokens, }, } except Exception as e: await manager.mark_error(account) logger.error("Chat completion error for model=%s: %s", request.model, e) raise HTTPException(status_code=503, detail=str(e)) @app.get("/healthz") async def healthz(): return {"status": "ok"} @app.get("/readyz") async def readyz(): stats = manager.get_stats() return { "status": "ok", "accounts": { "total": stats["total"], "in_use": stats["in_use"], "available": stats["available"], "logged_in": stats["logged_in"], "muted": stats["muted"], "queue_size": stats["queue_size"], }, } @app.get("/admin/stats") async def admin_stats(admin_key: str = Header(...)): if admin_key != config.server.admin_key: raise HTTPException(status_code=401, detail="Invalid admin key") return manager.get_stats() @app.post("/admin/accounts/import") async def import_accounts(request: Request, admin_key: str = Header(...)): if admin_key != config.server.admin_key: raise HTTPException(status_code=401, detail="Invalid admin key") body = await request.json() accounts = body.get("accounts", []) if not accounts: raise HTTPException(status_code=400, detail="No accounts provided") imported = 0 for acc in accounts: email = acc.get("email") password = acc.get("password") name = acc.get("name", "") proxy = acc.get("proxy") if email and password: manager.add_account(email, password, name, proxy) imported += 1 return {"success": True, "imported": imported, "total": len(manager.accounts)} @app.get("/admin/accounts") async def list_accounts(admin_key: str = Header(...)): if admin_key != config.server.admin_key: raise HTTPException(status_code=401, detail="Invalid admin key") accounts = [] for email, acc in manager.accounts.items(): accounts.append({ "email": email, "name": acc.name, "in_use": acc.in_use, "logged_in": acc.logged_in, "is_muted": acc.is_muted, "muted_until": acc.muted_until, "error_count": acc.error_count, }) return {"accounts": accounts, "total": len(accounts)} @app.get("/") async def admin_panel(): from fastapi.responses import HTMLResponse return HTMLResponse(content=ADMIN_HTML) @app.on_event("startup") async def startup(): for acc in config.accounts: manager.add_account( email=acc.email, password=acc.password, name=acc.name, proxy=acc.proxy, ) logger.info("Loaded %d accounts", len(config.accounts)) ADMIN_HTML = """ DS2API · 控制台
浏览器模式
账号 活跃 可用 在线 排队

接口测试

/v1/chat/completions
响应

账号管理

邮箱备注登录状态禁言错误
加载中…

导入账号

格式:邮箱:密码 ,每行一个
""" def main(): import uvicorn uvicorn.run( app, host=config.server.host, port=config.server.port, ) if __name__ == "__main__": main()