import json import os import time import uuid import logging import asyncio from typing import Optional from contextlib import asynccontextmanager from fastapi import FastAPI, Request, HTTPException from fastapi.responses import StreamingResponse, JSONResponse from fastapi.middleware.cors import CORSMiddleware from duck_client import DuckAIClient logging.basicConfig(level=logging.INFO, format="%(asctime)s [%(levelname)s] %(message)s") logger = logging.getLogger(__name__) CONFIG_FILE = os.environ.get("CONFIG_FILE", "config.json") def load_config() -> dict: try: with open(CONFIG_FILE, "r", encoding="utf-8") as f: return json.load(f) except FileNotFoundError: return {} config = load_config() def env_or_config(env_name: str, config_key: str, default=None, cast=None): value = os.environ.get(env_name) if value is None: value = config.get(config_key, default) if value is None: return None if cast is bool: if isinstance(value, bool): return value return str(value).strip().lower() in {"1", "true", "yes", "on"} if cast is int: return int(value) if cast: return cast(value) return value API_KEY = env_or_config("API_KEY", "api_key", "sk-duck-ai") PROXY = env_or_config("PROXY", "proxy", None) HOST = env_or_config("HOST", "host", "0.0.0.0") PORT = env_or_config("PORT", "port", 7860, cast=int) DEFAULT_MODEL = env_or_config("DEFAULT_MODEL", "default_model", "claude-haiku-4-5") ASSISTANT_NAME = env_or_config("ASSISTANT_NAME", "assistant_name", None) SYSTEM_PROMPT = env_or_config("SYSTEM_PROMPT", "system_prompt", None) WEB_SEARCH = env_or_config("WEB_SEARCH", "web_search", True, cast=bool) POOL_SIZE = env_or_config("POOL_SIZE", "pool_size", 2, cast=int) # Single shared client with page pool _shared_client: Optional[DuckAIClient] = None _client_lock = asyncio.Lock() async def get_client() -> DuckAIClient: global _shared_client async with _client_lock: if _shared_client is None: _shared_client = DuckAIClient( proxy=PROXY, model=DEFAULT_MODEL, assistant_name=ASSISTANT_NAME, system_prompt=SYSTEM_PROMPT, pool_size=POOL_SIZE, ) return _shared_client async def return_client(c: DuckAIClient): return None @asynccontextmanager async def lifespan(app: FastAPI): logger.info("Duck.ai 2API server starting on %s:%s", HOST, PORT) logger.info("Default model: %s", DEFAULT_MODEL) logger.info("Web search: %s", WEB_SEARCH) logger.info("Pool size: %s", POOL_SIZE) logger.info("Proxy: %s", PROXY or "None") yield global _shared_client if _shared_client: await _shared_client.close() _shared_client = None logger.info("Server shutdown complete") app = FastAPI(title="Duck.ai 2API", lifespan=lifespan) app.add_middleware( CORSMiddleware, allow_origins=["*"], allow_credentials=True, allow_methods=["*"], allow_headers=["*"], ) def verify_auth(request: Request): if not API_KEY: return auth = request.headers.get("Authorization", "") if auth.startswith("Bearer "): token = auth[7:] else: token = auth if token != API_KEY: raise HTTPException(status_code=401, detail="Invalid API key") MODEL_MAP = { "claude-haiku-4-5": "claude-haiku-4-5", "claude-3-haiku": "claude-haiku-4-5", "claude-3-5-haiku": "claude-haiku-4-5", "claude-3-haiku-20240307": "claude-haiku-4-5", "gpt-4o-mini": "gpt-4o-mini", "gpt-4o": "gpt-4o-mini", "gpt-3.5-turbo": "gpt-4o-mini", "llama-3.3-70b": "meta-llama/Llama-3.3-70B-Instruct-Turbo", "mixtral-8x7b": "mistralai/Mixtral-8x7B-Instruct-v0.1", "o3-mini": "o3-mini", } def map_model(model: str) -> str: return MODEL_MAP.get(model, DEFAULT_MODEL) @app.get("/v1/models") @app.get("/models") async def list_models(request: Request): verify_auth(request) models = [ {"id": "claude-haiku-4-5", "object": "model", "owned_by": "anthropic"}, {"id": "gpt-4o-mini", "object": "model", "owned_by": "openai"}, {"id": "o3-mini", "object": "model", "owned_by": "openai"}, {"id": "meta-llama/Llama-3.3-70B-Instruct-Turbo", "object": "model", "owned_by": "meta"}, {"id": "mistralai/Mixtral-8x7B-Instruct-v0.1", "object": "model", "owned_by": "mistral"}, ] return {"object": "list", "data": models} @app.post("/v1/chat/completions") @app.post("/chat/completions") async def chat_completions(request: Request): verify_auth(request) try: body = await request.json() except Exception: raise HTTPException(status_code=400, detail="Invalid JSON body") messages = body.get("messages", []) if not messages: raise HTTPException(status_code=400, detail="messages is required") req_model = body.get("model", DEFAULT_MODEL) duck_model = map_model(req_model) stream = body.get("stream", False) web_search = body.get("web_search", WEB_SEARCH) custom_instructions = None assistant_name = ASSISTANT_NAME for msg in messages: if msg.get("role") == "system": custom_instructions = msg.get("content", "") break completion_id = f"chatcmpl-{uuid.uuid4().hex[:24]}" created = int(time.time()) client = await get_client() client.model = duck_model if stream: return StreamingResponse( _stream_response( client, messages, web_search, custom_instructions, assistant_name, completion_id, created, req_model, ), media_type="text/event-stream", headers={ "Cache-Control": "no-cache", "Connection": "keep-alive", "X-Accel-Buffering": "no", }, ) return await _non_stream_response( client, messages, web_search, custom_instructions, assistant_name, completion_id, created, req_model, ) async def _stream_response(client: DuckAIClient, messages: list, web_search: bool, custom_instructions: str, assistant_name: str, completion_id: str, created: int, model: str): try: first_chunk = { "id": completion_id, "object": "chat.completion.chunk", "created": created, "model": model, "choices": [{ "index": 0, "delta": {"role": "assistant", "content": ""}, "finish_reason": None, }], } yield f"data: {json.dumps(first_chunk)}\n\n" search_sources = [] async for event in client.chat_stream( messages=messages, web_search=web_search, custom_instructions=custom_instructions, assistant_name=assistant_name, ): etype = event.get("type") if etype == "text": text = event.get("data", "") if text and isinstance(text, str): chunk = { "id": completion_id, "object": "chat.completion.chunk", "created": created, "model": model, "choices": [{ "index": 0, "delta": {"content": text}, "finish_reason": None, }], } yield f"data: {json.dumps(chunk)}\n\n" elif etype == "message": data = event.get("data", {}) text = "" if isinstance(data, dict): text = data.get("message", data.get("content", "")) elif isinstance(data, str): text = data if text and isinstance(text, str): chunk = { "id": completion_id, "object": "chat.completion.chunk", "created": created, "model": model, "choices": [{ "index": 0, "delta": {"content": text}, "finish_reason": None, }], } yield f"data: {json.dumps(chunk)}\n\n" elif etype == "search_source": src = event.get("data", {}) if isinstance(src, dict) and src.get("url"): search_sources.append(src) elif etype in ("search_begin", "search_results", "search_end"): pass elif etype == "done": break elif etype == "event": data = event.get("data", {}) if isinstance(data, dict): msg = data.get("message", data.get("content", "")) if msg and isinstance(msg, str): chunk = { "id": completion_id, "object": "chat.completion.chunk", "created": created, "model": model, "choices": [{ "index": 0, "delta": {"content": msg}, "finish_reason": None, }], } yield f"data: {json.dumps(chunk)}\n\n" if search_sources: refs = "\n\n---\n**搜索结果:**\n" for i, src in enumerate(search_sources[:8], 1): title = src.get("title", "") url = src.get("url", "") site = src.get("site", "") favicon = f"https://www.google.com/s2/favicons?domain={site}&sz=32" if site else "" if title and url: icon = f"![favicon]({favicon}) " if favicon else "" refs += f"{i}. {icon}[{title}]({url}) - {site}\n" chunk = { "id": completion_id, "object": "chat.completion.chunk", "created": created, "model": model, "choices": [{ "index": 0, "delta": {"content": refs}, "finish_reason": None, }], } yield f"data: {json.dumps(chunk)}\n\n" final_chunk = { "id": completion_id, "object": "chat.completion.chunk", "created": created, "model": model, "choices": [{ "index": 0, "delta": {}, "finish_reason": "stop", }], } yield f"data: {json.dumps(final_chunk)}\n\n" yield "data: [DONE]\n\n" except Exception as e: logger.error("Stream error: %s", e, exc_info=True) error_chunk = { "id": completion_id, "object": "chat.completion.chunk", "created": created, "model": model, "choices": [{ "index": 0, "delta": {"content": f"\n\n[Error: {str(e)}]"}, "finish_reason": "stop", }], } yield f"data: {json.dumps(error_chunk)}\n\n" yield "data: [DONE]\n\n" finally: await return_client(client) async def _non_stream_response(client: DuckAIClient, messages: list, web_search: bool, custom_instructions: str, assistant_name: str, completion_id: str, created: int, model: str): full_content = "" search_sources = [] try: async for event in client.chat_stream( messages=messages, web_search=web_search, custom_instructions=custom_instructions, assistant_name=assistant_name, ): etype = event.get("type") if etype == "text": val = event.get("data", "") if isinstance(val, str): full_content += val elif etype == "message": data = event.get("data", {}) if isinstance(data, dict): msg = data.get("message", data.get("content", "")) if isinstance(msg, str): full_content += msg elif isinstance(data, str): full_content += data elif etype == "search_source": src = event.get("data", {}) if isinstance(src, dict) and src.get("url"): search_sources.append(src) elif etype == "event": data = event.get("data", {}) if isinstance(data, dict): msg = data.get("message", data.get("content", "")) if isinstance(msg, str): full_content += msg elif etype == "done": break except Exception as e: logger.error("Chat error: %s", e, exc_info=True) raise HTTPException(status_code=500, detail=str(e)) finally: await return_client(client) if search_sources: refs = "\n\n---\n**搜索结果:**\n" for i, src in enumerate(search_sources[:8], 1): title = src.get("title", "") url = src.get("url", "") site = src.get("site", "") favicon = f"https://www.google.com/s2/favicons?domain={site}&sz=32" if site else "" if title and url: icon = f"![favicon]({favicon}) " if favicon else "" refs += f"{i}. {icon}[{title}]({url}) - {site}\n" full_content += refs prompt_tokens = sum(len(m.get("content", "")) for m in messages) // 4 completion_tokens = len(full_content) // 4 return JSONResponse({ "id": completion_id, "object": "chat.completion", "created": created, "model": model, "choices": [{ "index": 0, "message": {"role": "assistant", "content": full_content}, "finish_reason": "stop", }], "usage": { "prompt_tokens": prompt_tokens, "completion_tokens": completion_tokens, "total_tokens": prompt_tokens + completion_tokens, }, }) @app.get("/") async def root(): return { "service": "Duck.ai 2API", "status": "running", "model": DEFAULT_MODEL, "docs": "/docs", "health": "/health", } @app.get("/health") async def health(): client = None try: client = await get_client() return { "status": "ok", "model": DEFAULT_MODEL, "pool": client.pool_status(), } except Exception as e: return JSONResponse( status_code=503, content={"status": "error", "detail": str(e)}, ) if __name__ == "__main__": import uvicorn uvicorn.run("server:app", host=HOST, port=PORT, reload=False)