ds2api-browser / proxy.py
nacho
fix: 全面代码审查修复
466f6e2
raw
history blame
11.9 kB
import json
import logging
import os
import time
import uuid
from typing import Optional
import httpx
from fastapi import FastAPI, HTTPException, Header, Request
from fastapi.middleware.cors import CORSMiddleware
from fastapi.responses import StreamingResponse
from pydantic import BaseModel
logging.basicConfig(
level=logging.INFO,
format="%(asctime)s [%(levelname)s] %(name)s: %(message)s",
)
logger = logging.getLogger(__name__)
app = FastAPI(title="DS2API Browser Proxy")
app.add_middleware(
CORSMiddleware,
allow_origins=["*"],
allow_credentials=True,
allow_methods=["*"],
allow_headers=["*"],
)
DS2API_URL = os.getenv("DS2API_UPSTREAM_URL", "http://127.0.0.1:5001")
API_KEYS = os.getenv("DS2API_PROXY_KEYS", os.getenv("DS2API_KEYS", "sk-default")).split(",")
ADMIN_KEY = os.getenv("DS2API_ADMIN_KEY", "admin")
# Internal key used to talk to the upstream DS2API instance
_UPSTREAM_KEY = os.getenv("DS2API_UPSTREAM_KEY", API_KEYS[0] if API_KEYS else "sk-default")
class Message(BaseModel):
role: str
content: str
class ChatCompletionRequest(BaseModel):
model: str
messages: list[Message]
stream: bool = False
temperature: Optional[float] = None
max_tokens: Optional[int] = None
def verify_api_key(authorization: Optional[str] = Header(None)) -> str:
if not authorization:
raise HTTPException(status_code=401, detail="Missing API key")
token = authorization.replace("Bearer ", "").strip()
if token not in API_KEYS:
raise HTTPException(status_code=401, detail="Invalid API key")
return token
def _upstream_headers() -> dict:
return {"Authorization": f"Bearer {_UPSTREAM_KEY}"}
@app.get("/v1/models")
async def list_models(authorization: str = Header(...)):
verify_api_key(authorization)
async with httpx.AsyncClient() as client:
resp = await client.get(f"{DS2API_URL}/v1/models", headers=_upstream_headers())
return resp.json()
@app.get("/v1/models/{model_id}")
async def get_model(model_id: str, authorization: str = Header(...)):
verify_api_key(authorization)
async with httpx.AsyncClient() as client:
resp = await client.get(f"{DS2API_URL}/v1/models/{model_id}", headers=_upstream_headers())
return resp.json()
@app.post("/v1/chat/completions")
async def chat_completions(
request: ChatCompletionRequest,
authorization: str = Header(...),
):
verify_api_key(authorization)
if not request.messages:
raise HTTPException(status_code=400, detail="No messages provided")
async with httpx.AsyncClient() as client:
if request.stream:
async def stream_with_cleanup():
async with httpx.AsyncClient() as stream_client:
async with stream_client.stream(
"POST",
f"{DS2API_URL}/v1/chat/completions",
json=request.model_dump(),
headers=_upstream_headers(),
timeout=120,
) as resp:
async for line in resp.aiter_lines():
yield line + "\n"
return StreamingResponse(
stream_with_cleanup(),
media_type="text/event-stream",
)
resp = await client.post(
f"{DS2API_URL}/v1/chat/completions",
json=request.model_dump(),
headers=_upstream_headers(),
timeout=120,
)
return resp.json()
@app.get("/anthropic/v1/models")
async def anthropic_models(authorization: str = Header(...)):
verify_api_key(authorization)
return {
"data": [
{"id": "claude-sonnet-4-6", "object": "model", "created": int(time.time()), "owned_by": "anthropic"},
{"id": "claude-opus-4-6", "object": "model", "created": int(time.time()), "owned_by": "anthropic"},
{"id": "claude-haiku-4-5", "object": "model", "created": int(time.time()), "owned_by": "anthropic"},
],
"object": "list",
}
@app.post("/anthropic/v1/messages")
async def anthropic_messages(request: Request, authorization: str = Header(...)):
verify_api_key(authorization)
body = await request.json()
messages = body.get("messages", [])
model = body.get("model", "claude-sonnet-4-6")
stream = body.get("stream", False)
if not messages:
raise HTTPException(status_code=400, detail="No messages provided")
prompt = messages[-1].get("content", "")
async with httpx.AsyncClient() as client:
if stream:
async def stream_with_cleanup():
async with httpx.AsyncClient() as stream_client:
async with stream_client.stream(
"POST",
f"{DS2API_URL}/v1/chat/completions",
json={"model": "deepseek-flash", "messages": [{"role": "user", "content": prompt}], "stream": True},
headers=_upstream_headers(),
timeout=120,
) as resp:
async for line in resp.aiter_lines():
if line.startswith("data: "):
data_str = line[6:].strip()
if data_str == "[DONE]":
continue
try:
data = json.loads(data_str)
content = data.get("choices", [{}])[0].get("delta", {}).get("content", "")
if content:
yield f"event: content_block_delta\ndata: {json.dumps({'type': 'content_block_delta', 'index': 0, 'delta': {'type': 'text_delta', 'text': content}})}\n\n"
except json.JSONDecodeError:
pass
yield f"event: message_stop\ndata: {json.dumps({'type': 'message_stop'})}\n\n"
return StreamingResponse(
stream_with_cleanup(),
media_type="text/event-stream",
)
resp = await client.post(
f"{DS2API_URL}/v1/chat/completions",
json={"model": "deepseek-flash", "messages": [{"role": "user", "content": prompt}], "stream": False},
headers=_upstream_headers(),
timeout=120,
)
data = resp.json()
content = data.get("choices", [{}])[0].get("message", {}).get("content", "")
return {
"id": f"msg_{uuid.uuid4().hex[:8]}",
"type": "message",
"role": "assistant",
"model": model,
"content": [{"type": "text", "text": content}],
"stop_reason": "end_turn",
"usage": {
"input_tokens": len(prompt.split()),
"output_tokens": len(content.split()),
},
}
@app.post("/v1beta/models/{model}:generateContent")
async def gemini_generate(model: str, request: Request, authorization: str = Header(...)):
verify_api_key(authorization)
body = await request.json()
contents = body.get("contents", [])
if not contents:
raise HTTPException(status_code=400, detail="No contents provided")
prompt = contents[-1].get("parts", [{}])[0].get("text", "")
async with httpx.AsyncClient() as client:
resp = await client.post(
f"{DS2API_URL}/v1/chat/completions",
json={"model": "deepseek-flash", "messages": [{"role": "user", "content": prompt}], "stream": False},
headers=_upstream_headers(),
timeout=120,
)
data = resp.json()
content = data.get("choices", [{}])[0].get("message", {}).get("content", "")
return {
"candidates": [
{
"content": {
"parts": [{"text": content}],
"role": "model",
},
"finishReason": "STOP",
}
],
"usageMetadata": {
"promptTokenCount": len(prompt.split()),
"candidatesTokenCount": len(content.split()),
"totalTokenCount": len(prompt.split()) + len(content.split()),
},
}
@app.post("/v1beta/models/{model}:streamGenerateContent")
async def gemini_stream_generate(model: str, request: Request, authorization: str = Header(...)):
verify_api_key(authorization)
body = await request.json()
contents = body.get("contents", [])
if not contents:
raise HTTPException(status_code=400, detail="No contents provided")
prompt = contents[-1].get("parts", [{}])[0].get("text", "")
async def stream_with_cleanup():
async with httpx.AsyncClient() as stream_client:
async with stream_client.stream(
"POST",
f"{DS2API_URL}/v1/chat/completions",
json={"model": "deepseek-flash", "messages": [{"role": "user", "content": prompt}], "stream": True},
headers=_upstream_headers(),
timeout=120,
) as resp:
async for line in resp.aiter_lines():
if line.startswith("data: "):
data_str = line[6:].strip()
if data_str == "[DONE]":
continue
try:
data = json.loads(data_str)
content = data.get("choices", [{}])[0].get("delta", {}).get("content", "")
if content:
yield f"data: {json.dumps({'candidates': [{'content': {'parts': [{'text': content}], 'role': 'model'}}]})}\n\n"
except json.JSONDecodeError:
pass
yield f"data: {json.dumps({'candidates': [{'content': {'parts': [], 'role': 'model'}, 'finishReason': 'STOP'}], 'usageMetadata': {'promptTokenCount': 0, 'candidatesTokenCount': 0, 'totalTokenCount': 0}})}\n\n"
return StreamingResponse(
stream_with_cleanup(),
media_type="text/event-stream",
)
@app.get("/api/version")
async def ollama_version():
return {"version": "0.1.0"}
@app.get("/api/tags")
async def ollama_tags():
return {
"models": [
{"name": "deepseek-chat", "model": "deepseek-chat"},
{"name": "deepseek-reasoner", "model": "deepseek-reasoner"},
]
}
@app.post("/api/show")
async def ollama_show(request: Request):
body = await request.json()
model = body.get("model", "deepseek-chat")
return {
"id": model,
"capabilities": ["tools", "thinking"],
}
@app.get("/healthz")
async def healthz():
return {"status": "ok"}
@app.get("/readyz")
async def readyz():
return {"status": "ok", "accounts": {"total": 1, "in_use": 0, "available": 1}}
@app.get("/admin/stats")
async def admin_stats(admin_key: str = Header(...)):
if admin_key != ADMIN_KEY:
raise HTTPException(status_code=401, detail="Invalid admin key")
return {"total": 1, "in_use": 0, "available": 1, "logged_in": 1, "queue_size": 0}
@app.get("/admin/config")
async def get_config(admin_key: str = Header(...)):
if admin_key != ADMIN_KEY:
raise HTTPException(status_code=401, detail="Invalid admin key")
return {
"server": {"host": "0.0.0.0", "port": 5002},
"browser": {"headless": True, "max_concurrent_per_account": 1, "timeout": 60000},
"default_proxy": None,
"account_count": 1,
}
def main():
import uvicorn
uvicorn.run(
app,
host="0.0.0.0",
port=int(os.getenv("DS2API_PROXY_PORT", "5002")),
)
if __name__ == "__main__":
main()