duck / server.py
Spooker's picture
Upload 8 files
4d2e96d verified
import json
import os
import time
import uuid
import logging
import asyncio
from typing import Optional
from contextlib import asynccontextmanager
from fastapi import FastAPI, Request, HTTPException
from fastapi.responses import StreamingResponse, JSONResponse
from fastapi.middleware.cors import CORSMiddleware
from duck_client import DuckAIClient
logging.basicConfig(level=logging.INFO, format="%(asctime)s [%(levelname)s] %(message)s")
logger = logging.getLogger(__name__)
CONFIG_FILE = os.environ.get("CONFIG_FILE", "config.json")
def load_config() -> dict:
try:
with open(CONFIG_FILE, "r", encoding="utf-8") as f:
return json.load(f)
except FileNotFoundError:
return {}
config = load_config()
def env_or_config(env_name: str, config_key: str, default=None, cast=None):
value = os.environ.get(env_name)
if value is None:
value = config.get(config_key, default)
if value is None:
return None
if cast is bool:
if isinstance(value, bool):
return value
return str(value).strip().lower() in {"1", "true", "yes", "on"}
if cast is int:
return int(value)
if cast:
return cast(value)
return value
API_KEY = env_or_config("API_KEY", "api_key", "sk-duck-ai")
PROXY = env_or_config("PROXY", "proxy", None)
HOST = env_or_config("HOST", "host", "0.0.0.0")
PORT = env_or_config("PORT", "port", 7860, cast=int)
DEFAULT_MODEL = env_or_config("DEFAULT_MODEL", "default_model", "claude-haiku-4-5")
ASSISTANT_NAME = env_or_config("ASSISTANT_NAME", "assistant_name", None)
SYSTEM_PROMPT = env_or_config("SYSTEM_PROMPT", "system_prompt", None)
WEB_SEARCH = env_or_config("WEB_SEARCH", "web_search", True, cast=bool)
POOL_SIZE = env_or_config("POOL_SIZE", "pool_size", 2, cast=int)
# Single shared client with page pool
_shared_client: Optional[DuckAIClient] = None
_client_lock = asyncio.Lock()
async def get_client() -> DuckAIClient:
global _shared_client
async with _client_lock:
if _shared_client is None:
_shared_client = DuckAIClient(
proxy=PROXY,
model=DEFAULT_MODEL,
assistant_name=ASSISTANT_NAME,
system_prompt=SYSTEM_PROMPT,
pool_size=POOL_SIZE,
)
return _shared_client
async def return_client(c: DuckAIClient):
return None
@asynccontextmanager
async def lifespan(app: FastAPI):
logger.info("Duck.ai 2API server starting on %s:%s", HOST, PORT)
logger.info("Default model: %s", DEFAULT_MODEL)
logger.info("Web search: %s", WEB_SEARCH)
logger.info("Pool size: %s", POOL_SIZE)
logger.info("Proxy: %s", PROXY or "None")
yield
global _shared_client
if _shared_client:
await _shared_client.close()
_shared_client = None
logger.info("Server shutdown complete")
app = FastAPI(title="Duck.ai 2API", lifespan=lifespan)
app.add_middleware(
CORSMiddleware,
allow_origins=["*"],
allow_credentials=True,
allow_methods=["*"],
allow_headers=["*"],
)
def verify_auth(request: Request):
if not API_KEY:
return
auth = request.headers.get("Authorization", "")
if auth.startswith("Bearer "):
token = auth[7:]
else:
token = auth
if token != API_KEY:
raise HTTPException(status_code=401, detail="Invalid API key")
MODEL_MAP = {
"claude-haiku-4-5": "claude-haiku-4-5",
"claude-3-haiku": "claude-haiku-4-5",
"claude-3-5-haiku": "claude-haiku-4-5",
"claude-3-haiku-20240307": "claude-haiku-4-5",
"gpt-4o-mini": "gpt-4o-mini",
"gpt-4o": "gpt-4o-mini",
"gpt-3.5-turbo": "gpt-4o-mini",
"llama-3.3-70b": "meta-llama/Llama-3.3-70B-Instruct-Turbo",
"mixtral-8x7b": "mistralai/Mixtral-8x7B-Instruct-v0.1",
"o3-mini": "o3-mini",
}
def map_model(model: str) -> str:
return MODEL_MAP.get(model, DEFAULT_MODEL)
@app.get("/v1/models")
@app.get("/models")
async def list_models(request: Request):
verify_auth(request)
models = [
{"id": "claude-haiku-4-5", "object": "model", "owned_by": "anthropic"},
{"id": "gpt-4o-mini", "object": "model", "owned_by": "openai"},
{"id": "o3-mini", "object": "model", "owned_by": "openai"},
{"id": "meta-llama/Llama-3.3-70B-Instruct-Turbo", "object": "model", "owned_by": "meta"},
{"id": "mistralai/Mixtral-8x7B-Instruct-v0.1", "object": "model", "owned_by": "mistral"},
]
return {"object": "list", "data": models}
@app.post("/v1/chat/completions")
@app.post("/chat/completions")
async def chat_completions(request: Request):
verify_auth(request)
try:
body = await request.json()
except Exception:
raise HTTPException(status_code=400, detail="Invalid JSON body")
messages = body.get("messages", [])
if not messages:
raise HTTPException(status_code=400, detail="messages is required")
req_model = body.get("model", DEFAULT_MODEL)
duck_model = map_model(req_model)
stream = body.get("stream", False)
web_search = body.get("web_search", WEB_SEARCH)
custom_instructions = None
assistant_name = ASSISTANT_NAME
for msg in messages:
if msg.get("role") == "system":
custom_instructions = msg.get("content", "")
break
completion_id = f"chatcmpl-{uuid.uuid4().hex[:24]}"
created = int(time.time())
client = await get_client()
client.model = duck_model
if stream:
return StreamingResponse(
_stream_response(
client,
messages,
web_search,
custom_instructions,
assistant_name,
completion_id,
created,
req_model,
),
media_type="text/event-stream",
headers={
"Cache-Control": "no-cache",
"Connection": "keep-alive",
"X-Accel-Buffering": "no",
},
)
return await _non_stream_response(
client,
messages,
web_search,
custom_instructions,
assistant_name,
completion_id,
created,
req_model,
)
async def _stream_response(client: DuckAIClient, messages: list,
web_search: bool, custom_instructions: str,
assistant_name: str, completion_id: str,
created: int, model: str):
try:
first_chunk = {
"id": completion_id,
"object": "chat.completion.chunk",
"created": created,
"model": model,
"choices": [{
"index": 0,
"delta": {"role": "assistant", "content": ""},
"finish_reason": None,
}],
}
yield f"data: {json.dumps(first_chunk)}\n\n"
search_sources = []
async for event in client.chat_stream(
messages=messages,
web_search=web_search,
custom_instructions=custom_instructions,
assistant_name=assistant_name,
):
etype = event.get("type")
if etype == "text":
text = event.get("data", "")
if text and isinstance(text, str):
chunk = {
"id": completion_id,
"object": "chat.completion.chunk",
"created": created,
"model": model,
"choices": [{
"index": 0,
"delta": {"content": text},
"finish_reason": None,
}],
}
yield f"data: {json.dumps(chunk)}\n\n"
elif etype == "message":
data = event.get("data", {})
text = ""
if isinstance(data, dict):
text = data.get("message", data.get("content", ""))
elif isinstance(data, str):
text = data
if text and isinstance(text, str):
chunk = {
"id": completion_id,
"object": "chat.completion.chunk",
"created": created,
"model": model,
"choices": [{
"index": 0,
"delta": {"content": text},
"finish_reason": None,
}],
}
yield f"data: {json.dumps(chunk)}\n\n"
elif etype == "search_source":
src = event.get("data", {})
if isinstance(src, dict) and src.get("url"):
search_sources.append(src)
elif etype in ("search_begin", "search_results", "search_end"):
pass
elif etype == "done":
break
elif etype == "event":
data = event.get("data", {})
if isinstance(data, dict):
msg = data.get("message", data.get("content", ""))
if msg and isinstance(msg, str):
chunk = {
"id": completion_id,
"object": "chat.completion.chunk",
"created": created,
"model": model,
"choices": [{
"index": 0,
"delta": {"content": msg},
"finish_reason": None,
}],
}
yield f"data: {json.dumps(chunk)}\n\n"
if search_sources:
refs = "\n\n---\n**搜索结果:**\n"
for i, src in enumerate(search_sources[:8], 1):
title = src.get("title", "")
url = src.get("url", "")
site = src.get("site", "")
favicon = f"https://www.google.com/s2/favicons?domain={site}&sz=32" if site else ""
if title and url:
icon = f"![favicon]({favicon}) " if favicon else ""
refs += f"{i}. {icon}[{title}]({url}) - {site}\n"
chunk = {
"id": completion_id,
"object": "chat.completion.chunk",
"created": created,
"model": model,
"choices": [{
"index": 0,
"delta": {"content": refs},
"finish_reason": None,
}],
}
yield f"data: {json.dumps(chunk)}\n\n"
final_chunk = {
"id": completion_id,
"object": "chat.completion.chunk",
"created": created,
"model": model,
"choices": [{
"index": 0,
"delta": {},
"finish_reason": "stop",
}],
}
yield f"data: {json.dumps(final_chunk)}\n\n"
yield "data: [DONE]\n\n"
except Exception as e:
logger.error("Stream error: %s", e, exc_info=True)
error_chunk = {
"id": completion_id,
"object": "chat.completion.chunk",
"created": created,
"model": model,
"choices": [{
"index": 0,
"delta": {"content": f"\n\n[Error: {str(e)}]"},
"finish_reason": "stop",
}],
}
yield f"data: {json.dumps(error_chunk)}\n\n"
yield "data: [DONE]\n\n"
finally:
await return_client(client)
async def _non_stream_response(client: DuckAIClient, messages: list,
web_search: bool, custom_instructions: str,
assistant_name: str, completion_id: str,
created: int, model: str):
full_content = ""
search_sources = []
try:
async for event in client.chat_stream(
messages=messages,
web_search=web_search,
custom_instructions=custom_instructions,
assistant_name=assistant_name,
):
etype = event.get("type")
if etype == "text":
val = event.get("data", "")
if isinstance(val, str):
full_content += val
elif etype == "message":
data = event.get("data", {})
if isinstance(data, dict):
msg = data.get("message", data.get("content", ""))
if isinstance(msg, str):
full_content += msg
elif isinstance(data, str):
full_content += data
elif etype == "search_source":
src = event.get("data", {})
if isinstance(src, dict) and src.get("url"):
search_sources.append(src)
elif etype == "event":
data = event.get("data", {})
if isinstance(data, dict):
msg = data.get("message", data.get("content", ""))
if isinstance(msg, str):
full_content += msg
elif etype == "done":
break
except Exception as e:
logger.error("Chat error: %s", e, exc_info=True)
raise HTTPException(status_code=500, detail=str(e))
finally:
await return_client(client)
if search_sources:
refs = "\n\n---\n**搜索结果:**\n"
for i, src in enumerate(search_sources[:8], 1):
title = src.get("title", "")
url = src.get("url", "")
site = src.get("site", "")
favicon = f"https://www.google.com/s2/favicons?domain={site}&sz=32" if site else ""
if title and url:
icon = f"![favicon]({favicon}) " if favicon else ""
refs += f"{i}. {icon}[{title}]({url}) - {site}\n"
full_content += refs
prompt_tokens = sum(len(m.get("content", "")) for m in messages) // 4
completion_tokens = len(full_content) // 4
return JSONResponse({
"id": completion_id,
"object": "chat.completion",
"created": created,
"model": model,
"choices": [{
"index": 0,
"message": {"role": "assistant", "content": full_content},
"finish_reason": "stop",
}],
"usage": {
"prompt_tokens": prompt_tokens,
"completion_tokens": completion_tokens,
"total_tokens": prompt_tokens + completion_tokens,
},
})
@app.get("/")
async def root():
return {
"service": "Duck.ai 2API",
"status": "running",
"model": DEFAULT_MODEL,
"docs": "/docs",
"health": "/health",
}
@app.get("/health")
async def health():
client = None
try:
client = await get_client()
return {
"status": "ok",
"model": DEFAULT_MODEL,
"pool": client.pool_status(),
}
except Exception as e:
return JSONResponse(
status_code=503,
content={"status": "error", "detail": str(e)},
)
if __name__ == "__main__":
import uvicorn
uvicorn.run("server:app", host=HOST, port=PORT, reload=False)