|
|
""" |
|
|
Dual-Compatible API Endpoint (OpenAI + Anthropic) |
|
|
Lightweight CPU-based implementation for Hugging Face Spaces |
|
|
- OpenAI format: /v1/chat/completions |
|
|
- Anthropic format: /anthropic/v1/messages |
|
|
""" |
|
|
|
|
|
import os |
|
|
import time |
|
|
import uuid |
|
|
import logging |
|
|
import re |
|
|
from datetime import datetime |
|
|
from logging.handlers import RotatingFileHandler |
|
|
from typing import List, Optional, Union, Dict, Any, Literal |
|
|
from contextlib import asynccontextmanager |
|
|
|
|
|
from fastapi import FastAPI, HTTPException, Header, Request |
|
|
from fastapi.responses import StreamingResponse, JSONResponse |
|
|
from fastapi.middleware.cors import CORSMiddleware |
|
|
from pydantic import BaseModel, Field |
|
|
import torch |
|
|
from transformers import AutoModelForCausalLM, AutoTokenizer, TextIteratorStreamer |
|
|
from threading import Thread |
|
|
import json |
|
|
|
|
|
|
|
|
LOG_DIR = "/tmp/logs" |
|
|
os.makedirs(LOG_DIR, exist_ok=True) |
|
|
LOG_FILE = os.path.join(LOG_DIR, "api.log") |
|
|
|
|
|
log_format = logging.Formatter( |
|
|
'%(asctime)s | %(levelname)-8s | %(name)s | %(message)s', |
|
|
datefmt='%Y-%m-%d %H:%M:%S' |
|
|
) |
|
|
|
|
|
file_handler = RotatingFileHandler( |
|
|
LOG_FILE, maxBytes=10*1024*1024, backupCount=5, encoding='utf-8' |
|
|
) |
|
|
file_handler.setFormatter(log_format) |
|
|
file_handler.setLevel(logging.DEBUG) |
|
|
|
|
|
console_handler = logging.StreamHandler() |
|
|
console_handler.setFormatter(log_format) |
|
|
console_handler.setLevel(logging.INFO) |
|
|
|
|
|
logging.basicConfig(level=logging.DEBUG, handlers=[file_handler, console_handler]) |
|
|
logger = logging.getLogger("dual-api") |
|
|
|
|
|
for uvicorn_logger in ["uvicorn", "uvicorn.error", "uvicorn.access"]: |
|
|
uv_log = logging.getLogger(uvicorn_logger) |
|
|
uv_log.handlers = [file_handler, console_handler] |
|
|
|
|
|
logger.info("=" * 60) |
|
|
logger.info(f"Dual API (OpenAI + Anthropic) Startup at {datetime.now().isoformat()}") |
|
|
logger.info(f"Log file: {LOG_FILE}") |
|
|
logger.info("=" * 60) |
|
|
|
|
|
|
|
|
MODEL_ID = "Qwen/Qwen2.5-Coder-3B-Instruct" |
|
|
DEVICE = "cpu" |
|
|
|
|
|
model = None |
|
|
tokenizer = None |
|
|
|
|
|
@asynccontextmanager |
|
|
async def lifespan(app: FastAPI): |
|
|
global model, tokenizer |
|
|
logger.info(f"Loading model: {MODEL_ID}") |
|
|
try: |
|
|
tokenizer = AutoTokenizer.from_pretrained(MODEL_ID) |
|
|
logger.info("Tokenizer loaded successfully") |
|
|
model = AutoModelForCausalLM.from_pretrained( |
|
|
MODEL_ID, torch_dtype=torch.float32, device_map=DEVICE, low_cpu_mem_usage=True |
|
|
) |
|
|
model.eval() |
|
|
logger.info("Model loaded successfully!") |
|
|
logger.info(f"Model parameters: {sum(p.numel() for p in model.parameters()):,}") |
|
|
except Exception as e: |
|
|
logger.error(f"Failed to load model: {e}", exc_info=True) |
|
|
raise |
|
|
yield |
|
|
logger.info("Shutting down, cleaning up model...") |
|
|
del model, tokenizer |
|
|
|
|
|
app = FastAPI( |
|
|
title="Dual-Compatible API (OpenAI + Anthropic)", |
|
|
description=""" |
|
|
Lightweight CPU-based API with dual compatibility: |
|
|
- OpenAI format: /v1/chat/completions |
|
|
- Anthropic format: /anthropic/v1/messages |
|
|
""", |
|
|
version="1.0.0", |
|
|
lifespan=lifespan |
|
|
) |
|
|
|
|
|
app.add_middleware( |
|
|
CORSMiddleware, |
|
|
allow_origins=["*"], |
|
|
allow_credentials=True, |
|
|
allow_methods=["*"], |
|
|
allow_headers=["*"], |
|
|
) |
|
|
|
|
|
@app.middleware("http") |
|
|
async def log_requests(request: Request, call_next): |
|
|
request_id = str(uuid.uuid4())[:8] |
|
|
start_time = time.time() |
|
|
logger.info(f"[{request_id}] {request.method} {request.url.path} - Started") |
|
|
try: |
|
|
response = await call_next(request) |
|
|
duration = (time.time() - start_time) * 1000 |
|
|
logger.info(f"[{request_id}] {request.method} {request.url.path} - {response.status_code} ({duration:.2f}ms)") |
|
|
return response |
|
|
except Exception as e: |
|
|
duration = (time.time() - start_time) * 1000 |
|
|
logger.error(f"[{request_id}] {request.method} {request.url.path} - Error: {e} ({duration:.2f}ms)") |
|
|
raise |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
class AnthropicTextBlock(BaseModel): |
|
|
type: Literal["text"] = "text" |
|
|
text: str |
|
|
|
|
|
class AnthropicImageSource(BaseModel): |
|
|
type: Literal["base64", "url"] = "base64" |
|
|
media_type: Optional[str] = None |
|
|
data: Optional[str] = None |
|
|
url: Optional[str] = None |
|
|
|
|
|
class AnthropicImageBlock(BaseModel): |
|
|
type: Literal["image"] = "image" |
|
|
source: AnthropicImageSource |
|
|
|
|
|
class AnthropicToolUseBlock(BaseModel): |
|
|
type: Literal["tool_use"] = "tool_use" |
|
|
id: str |
|
|
name: str |
|
|
input: Dict[str, Any] |
|
|
|
|
|
class AnthropicToolResultBlock(BaseModel): |
|
|
type: Literal["tool_result"] = "tool_result" |
|
|
tool_use_id: str |
|
|
content: Optional[Union[str, List[AnthropicTextBlock]]] = None |
|
|
is_error: Optional[bool] = False |
|
|
|
|
|
AnthropicContentBlock = Union[AnthropicTextBlock, AnthropicImageBlock, AnthropicToolUseBlock, AnthropicToolResultBlock] |
|
|
|
|
|
class AnthropicMessage(BaseModel): |
|
|
role: Literal["user", "assistant"] |
|
|
content: Union[str, List[AnthropicContentBlock]] |
|
|
|
|
|
class AnthropicToolInputSchema(BaseModel): |
|
|
type: Literal["object"] = "object" |
|
|
properties: Optional[Dict[str, Any]] = None |
|
|
required: Optional[List[str]] = None |
|
|
|
|
|
class AnthropicTool(BaseModel): |
|
|
name: str |
|
|
description: Optional[str] = None |
|
|
input_schema: AnthropicToolInputSchema |
|
|
|
|
|
class AnthropicToolChoiceAuto(BaseModel): |
|
|
type: Literal["auto"] = "auto" |
|
|
disable_parallel_tool_use: Optional[bool] = None |
|
|
|
|
|
class AnthropicToolChoiceAny(BaseModel): |
|
|
type: Literal["any"] = "any" |
|
|
disable_parallel_tool_use: Optional[bool] = None |
|
|
|
|
|
class AnthropicToolChoiceTool(BaseModel): |
|
|
type: Literal["tool"] = "tool" |
|
|
name: str |
|
|
disable_parallel_tool_use: Optional[bool] = None |
|
|
|
|
|
AnthropicToolChoice = Union[AnthropicToolChoiceAuto, AnthropicToolChoiceAny, AnthropicToolChoiceTool] |
|
|
|
|
|
class AnthropicMetadata(BaseModel): |
|
|
user_id: Optional[str] = None |
|
|
|
|
|
class AnthropicSystemContent(BaseModel): |
|
|
type: Literal["text"] = "text" |
|
|
text: str |
|
|
cache_control: Optional[Dict[str, str]] = None |
|
|
|
|
|
class AnthropicThinkingConfig(BaseModel): |
|
|
type: Literal["enabled", "disabled"] = "enabled" |
|
|
budget_tokens: Optional[int] = Field(default=1024, ge=1, le=128000) |
|
|
|
|
|
class AnthropicMessageRequest(BaseModel): |
|
|
model: str |
|
|
max_tokens: int |
|
|
messages: List[AnthropicMessage] |
|
|
metadata: Optional[AnthropicMetadata] = None |
|
|
stop_sequences: Optional[List[str]] = None |
|
|
stream: Optional[bool] = False |
|
|
system: Optional[Union[str, List[AnthropicSystemContent]]] = None |
|
|
temperature: Optional[float] = Field(default=1.0, ge=0.0, le=1.0) |
|
|
tool_choice: Optional[AnthropicToolChoice] = None |
|
|
tools: Optional[List[AnthropicTool]] = None |
|
|
top_k: Optional[int] = Field(default=None, ge=0) |
|
|
top_p: Optional[float] = Field(default=None, ge=0.0, le=1.0) |
|
|
thinking: Optional[AnthropicThinkingConfig] = None |
|
|
|
|
|
class AnthropicUsage(BaseModel): |
|
|
input_tokens: int |
|
|
output_tokens: int |
|
|
cache_creation_input_tokens: Optional[int] = None |
|
|
cache_read_input_tokens: Optional[int] = None |
|
|
|
|
|
class AnthropicResponseTextBlock(BaseModel): |
|
|
type: Literal["text"] = "text" |
|
|
text: str |
|
|
|
|
|
class AnthropicResponseThinkingBlock(BaseModel): |
|
|
type: Literal["thinking"] = "thinking" |
|
|
thinking: str |
|
|
|
|
|
class AnthropicResponseToolUseBlock(BaseModel): |
|
|
type: Literal["tool_use"] = "tool_use" |
|
|
id: str |
|
|
name: str |
|
|
input: Dict[str, Any] |
|
|
|
|
|
AnthropicResponseContentBlock = Union[AnthropicResponseTextBlock, AnthropicResponseThinkingBlock, AnthropicResponseToolUseBlock] |
|
|
|
|
|
class AnthropicMessageResponse(BaseModel): |
|
|
id: str |
|
|
type: Literal["message"] = "message" |
|
|
role: Literal["assistant"] = "assistant" |
|
|
content: List[AnthropicResponseContentBlock] |
|
|
model: str |
|
|
stop_reason: Optional[Literal["end_turn", "max_tokens", "stop_sequence", "tool_use"]] = None |
|
|
stop_sequence: Optional[str] = None |
|
|
usage: AnthropicUsage |
|
|
|
|
|
class AnthropicTokenCountRequest(BaseModel): |
|
|
model: str |
|
|
messages: List[AnthropicMessage] |
|
|
system: Optional[Union[str, List[AnthropicSystemContent]]] = None |
|
|
tools: Optional[List[AnthropicTool]] = None |
|
|
thinking: Optional[AnthropicThinkingConfig] = None |
|
|
|
|
|
class AnthropicTokenCountResponse(BaseModel): |
|
|
input_tokens: int |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
class OpenAIMessage(BaseModel): |
|
|
role: Literal["system", "user", "assistant", "tool"] |
|
|
content: Optional[Union[str, List[Dict[str, Any]]]] = None |
|
|
name: Optional[str] = None |
|
|
tool_calls: Optional[List[Dict[str, Any]]] = None |
|
|
tool_call_id: Optional[str] = None |
|
|
|
|
|
class OpenAITool(BaseModel): |
|
|
type: Literal["function"] = "function" |
|
|
function: Dict[str, Any] |
|
|
|
|
|
class OpenAIToolChoice(BaseModel): |
|
|
type: str |
|
|
function: Optional[Dict[str, str]] = None |
|
|
|
|
|
class OpenAIChatRequest(BaseModel): |
|
|
model: str |
|
|
messages: List[OpenAIMessage] |
|
|
max_tokens: Optional[int] = 1024 |
|
|
temperature: Optional[float] = Field(default=1.0, ge=0.0, le=2.0) |
|
|
top_p: Optional[float] = Field(default=1.0, ge=0.0, le=1.0) |
|
|
n: Optional[int] = 1 |
|
|
stream: Optional[bool] = False |
|
|
stop: Optional[Union[str, List[str]]] = None |
|
|
presence_penalty: Optional[float] = 0.0 |
|
|
frequency_penalty: Optional[float] = 0.0 |
|
|
logit_bias: Optional[Dict[str, float]] = None |
|
|
user: Optional[str] = None |
|
|
tools: Optional[List[OpenAITool]] = None |
|
|
tool_choice: Optional[Union[str, OpenAIToolChoice]] = None |
|
|
seed: Optional[int] = None |
|
|
|
|
|
class OpenAIUsage(BaseModel): |
|
|
prompt_tokens: int |
|
|
completion_tokens: int |
|
|
total_tokens: int |
|
|
|
|
|
class OpenAIChoice(BaseModel): |
|
|
index: int |
|
|
message: Dict[str, Any] |
|
|
finish_reason: Optional[str] = None |
|
|
|
|
|
class OpenAIChatResponse(BaseModel): |
|
|
id: str |
|
|
object: Literal["chat.completion"] = "chat.completion" |
|
|
created: int |
|
|
model: str |
|
|
choices: List[OpenAIChoice] |
|
|
usage: OpenAIUsage |
|
|
system_fingerprint: Optional[str] = None |
|
|
|
|
|
class OpenAIStreamChoice(BaseModel): |
|
|
index: int |
|
|
delta: Dict[str, Any] |
|
|
finish_reason: Optional[str] = None |
|
|
|
|
|
class OpenAIStreamResponse(BaseModel): |
|
|
id: str |
|
|
object: Literal["chat.completion.chunk"] = "chat.completion.chunk" |
|
|
created: int |
|
|
model: str |
|
|
choices: List[OpenAIStreamChoice] |
|
|
|
|
|
class OpenAIModel(BaseModel): |
|
|
id: str |
|
|
object: Literal["model"] = "model" |
|
|
created: int |
|
|
owned_by: str |
|
|
|
|
|
class OpenAIModelList(BaseModel): |
|
|
object: Literal["list"] = "list" |
|
|
data: List[OpenAIModel] |
|
|
|
|
|
|
|
|
|
|
|
def extract_anthropic_text(content: Union[str, List[AnthropicContentBlock]]) -> str: |
|
|
if isinstance(content, str): |
|
|
return content |
|
|
texts = [] |
|
|
for block in content: |
|
|
if isinstance(block, dict): |
|
|
if block.get("type") == "text": |
|
|
texts.append(block.get("text", "")) |
|
|
elif hasattr(block, "type") and block.type == "text": |
|
|
texts.append(block.text) |
|
|
return " ".join(texts) |
|
|
|
|
|
def extract_anthropic_system(system: Optional[Union[str, List[AnthropicSystemContent]]]) -> Optional[str]: |
|
|
if system is None: |
|
|
return None |
|
|
if isinstance(system, str): |
|
|
return system |
|
|
texts = [] |
|
|
for block in system: |
|
|
if isinstance(block, dict): |
|
|
texts.append(block.get("text", "")) |
|
|
elif hasattr(block, "text"): |
|
|
texts.append(block.text) |
|
|
return " ".join(texts) |
|
|
|
|
|
def extract_openai_content(content: Optional[Union[str, List[Dict[str, Any]]]]) -> str: |
|
|
if content is None: |
|
|
return "" |
|
|
if isinstance(content, str): |
|
|
return content |
|
|
texts = [] |
|
|
for item in content: |
|
|
if isinstance(item, dict) and item.get("type") == "text": |
|
|
texts.append(item.get("text", "")) |
|
|
return " ".join(texts) |
|
|
|
|
|
def format_anthropic_messages( |
|
|
messages: List[AnthropicMessage], |
|
|
system: Optional[Union[str, List[AnthropicSystemContent]]] = None, |
|
|
thinking_enabled: bool = False, |
|
|
budget_tokens: int = 1024 |
|
|
) -> str: |
|
|
formatted_messages = [] |
|
|
system_text = extract_anthropic_system(system) |
|
|
|
|
|
if thinking_enabled: |
|
|
thinking_instruction = f"""You are a helpful AI assistant with extended thinking capabilities. |
|
|
|
|
|
When responding to complex problems: |
|
|
1. First, think through the problem step by step inside <thinking>...</thinking> tags |
|
|
2. Consider multiple approaches and evaluate them |
|
|
3. Show your reasoning process clearly |
|
|
4. After thinking, provide your final answer outside the thinking tags |
|
|
|
|
|
Budget for thinking: up to {budget_tokens} tokens for reasoning. |
|
|
|
|
|
Think deeply and thoroughly before responding.""" |
|
|
if system_text: |
|
|
system_text = f"{thinking_instruction}\n\n{system_text}" |
|
|
else: |
|
|
system_text = thinking_instruction |
|
|
|
|
|
if system_text: |
|
|
formatted_messages.append({"role": "system", "content": system_text}) |
|
|
|
|
|
for msg in messages: |
|
|
content = extract_anthropic_text(msg.content) |
|
|
formatted_messages.append({"role": msg.role, "content": content}) |
|
|
|
|
|
if tokenizer.chat_template: |
|
|
return tokenizer.apply_chat_template(formatted_messages, tokenize=False, add_generation_prompt=True) |
|
|
|
|
|
prompt = "" |
|
|
for msg in formatted_messages: |
|
|
role = msg["role"].capitalize() |
|
|
prompt += f"{role}: {msg['content']}\n" |
|
|
prompt += "Assistant: " |
|
|
return prompt |
|
|
|
|
|
def format_openai_messages(messages: List[OpenAIMessage]) -> str: |
|
|
formatted_messages = [] |
|
|
for msg in messages: |
|
|
content = extract_openai_content(msg.content) |
|
|
formatted_messages.append({"role": msg.role, "content": content}) |
|
|
|
|
|
if tokenizer.chat_template: |
|
|
return tokenizer.apply_chat_template(formatted_messages, tokenize=False, add_generation_prompt=True) |
|
|
|
|
|
prompt = "" |
|
|
for msg in formatted_messages: |
|
|
role = msg["role"].capitalize() |
|
|
prompt += f"{role}: {msg['content']}\n" |
|
|
prompt += "Assistant: " |
|
|
return prompt |
|
|
|
|
|
def parse_thinking_response(text: str) -> tuple: |
|
|
thinking_pattern = r'<thinking>(.*?)</thinking>' |
|
|
thinking_matches = re.findall(thinking_pattern, text, re.DOTALL) |
|
|
if thinking_matches: |
|
|
thinking_text = "\n".join(thinking_matches).strip() |
|
|
answer_text = re.sub(thinking_pattern, '', text, flags=re.DOTALL).strip() |
|
|
return thinking_text, answer_text |
|
|
return None, text.strip() |
|
|
|
|
|
def generate_id(prefix: str = "msg") -> str: |
|
|
return f"{prefix}_{uuid.uuid4().hex[:24]}" |
|
|
|
|
|
|
|
|
|
|
|
@app.get("/") |
|
|
async def root(): |
|
|
return { |
|
|
"status": "healthy", |
|
|
"model": MODEL_ID, |
|
|
"endpoints": { |
|
|
"openai": "/v1/chat/completions", |
|
|
"anthropic": "/anthropic/v1/messages" |
|
|
}, |
|
|
"base_urls": { |
|
|
"openai_sdk": "https://likhonsheikh-anthropic-compatible-api.hf.space/v1", |
|
|
"anthropic_sdk": "https://likhonsheikh-anthropic-compatible-api.hf.space/anthropic" |
|
|
}, |
|
|
"features": ["extended-thinking", "streaming", "dual-compatibility"], |
|
|
"log_file": LOG_FILE |
|
|
} |
|
|
|
|
|
@app.get("/logs") |
|
|
async def get_logs(lines: int = 100): |
|
|
try: |
|
|
with open(LOG_FILE, 'r') as f: |
|
|
all_lines = f.readlines() |
|
|
recent_lines = all_lines[-lines:] if len(all_lines) > lines else all_lines |
|
|
return {"log_file": LOG_FILE, "total_lines": len(all_lines), "returned_lines": len(recent_lines), "logs": "".join(recent_lines)} |
|
|
except FileNotFoundError: |
|
|
return {"error": "Log file not found", "log_file": LOG_FILE} |
|
|
|
|
|
@app.get("/health") |
|
|
async def health(): |
|
|
return {"status": "ok", "model_loaded": model is not None, "log_file": LOG_FILE, "features": ["openai-compatible", "anthropic-compatible", "extended-thinking"]} |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
@app.get("/v1/models") |
|
|
async def openai_list_models(): |
|
|
"""List models (OpenAI format)""" |
|
|
return OpenAIModelList( |
|
|
data=[OpenAIModel(id="qwen2.5-coder-3b", created=int(time.time()), owned_by="qwen")] |
|
|
) |
|
|
|
|
|
@app.post("/v1/chat/completions") |
|
|
async def openai_chat_completions( |
|
|
request: OpenAIChatRequest, |
|
|
authorization: Optional[str] = Header(None) |
|
|
): |
|
|
"""Chat completions (OpenAI format)""" |
|
|
chat_id = generate_id("chatcmpl") |
|
|
logger.info(f"[{chat_id}] OpenAI chat - model: {request.model}, max_tokens: {request.max_tokens}, stream: {request.stream}") |
|
|
|
|
|
try: |
|
|
prompt = format_openai_messages(request.messages) |
|
|
inputs = tokenizer(prompt, return_tensors="pt").to(DEVICE) |
|
|
input_token_count = inputs.input_ids.shape[1] |
|
|
|
|
|
if request.stream: |
|
|
return await openai_stream_response(request, inputs, input_token_count, chat_id) |
|
|
|
|
|
gen_kwargs = { |
|
|
"max_new_tokens": request.max_tokens or 1024, |
|
|
"do_sample": request.temperature > 0 if request.temperature else False, |
|
|
"pad_token_id": tokenizer.eos_token_id, |
|
|
"eos_token_id": tokenizer.eos_token_id, |
|
|
} |
|
|
|
|
|
if request.temperature and request.temperature > 0: |
|
|
gen_kwargs["temperature"] = min(request.temperature, 1.0) |
|
|
if request.top_p: |
|
|
gen_kwargs["top_p"] = request.top_p |
|
|
|
|
|
if request.stop: |
|
|
stop_seqs = [request.stop] if isinstance(request.stop, str) else request.stop |
|
|
stop_ids = [] |
|
|
for seq in stop_seqs: |
|
|
tokens = tokenizer.encode(seq, add_special_tokens=False) |
|
|
if tokens: |
|
|
stop_ids.extend(tokens) |
|
|
if stop_ids: |
|
|
gen_kwargs["eos_token_id"] = list(set([tokenizer.eos_token_id] + stop_ids)) |
|
|
|
|
|
gen_start = time.time() |
|
|
with torch.no_grad(): |
|
|
outputs = model.generate(**inputs, **gen_kwargs) |
|
|
gen_time = time.time() - gen_start |
|
|
|
|
|
generated_tokens = outputs[0][input_token_count:] |
|
|
generated_text = tokenizer.decode(generated_tokens, skip_special_tokens=True) |
|
|
output_token_count = len(generated_tokens) |
|
|
|
|
|
finish_reason = "stop" |
|
|
if output_token_count >= (request.max_tokens or 1024): |
|
|
finish_reason = "length" |
|
|
|
|
|
logger.info(f"[{chat_id}] Generated {output_token_count} tokens in {gen_time:.2f}s") |
|
|
|
|
|
return OpenAIChatResponse( |
|
|
id=chat_id, |
|
|
created=int(time.time()), |
|
|
model=request.model, |
|
|
choices=[OpenAIChoice( |
|
|
index=0, |
|
|
message={"role": "assistant", "content": generated_text.strip()}, |
|
|
finish_reason=finish_reason |
|
|
)], |
|
|
usage=OpenAIUsage( |
|
|
prompt_tokens=input_token_count, |
|
|
completion_tokens=output_token_count, |
|
|
total_tokens=input_token_count + output_token_count |
|
|
) |
|
|
) |
|
|
|
|
|
except Exception as e: |
|
|
logger.error(f"[{chat_id}] Error: {e}", exc_info=True) |
|
|
raise HTTPException(status_code=500, detail=str(e)) |
|
|
|
|
|
async def openai_stream_response(request: OpenAIChatRequest, inputs, input_token_count: int, chat_id: str): |
|
|
"""Stream response in OpenAI format""" |
|
|
|
|
|
async def generate(): |
|
|
created = int(time.time()) |
|
|
|
|
|
|
|
|
initial_chunk = { |
|
|
"id": chat_id, |
|
|
"object": "chat.completion.chunk", |
|
|
"created": created, |
|
|
"model": request.model, |
|
|
"choices": [{"index": 0, "delta": {"role": "assistant", "content": ""}, "finish_reason": None}] |
|
|
} |
|
|
yield f"data: {json.dumps(initial_chunk)}\n\n" |
|
|
|
|
|
streamer = TextIteratorStreamer(tokenizer, skip_prompt=True, skip_special_tokens=True) |
|
|
|
|
|
gen_kwargs = { |
|
|
**inputs, |
|
|
"max_new_tokens": request.max_tokens or 1024, |
|
|
"do_sample": request.temperature > 0 if request.temperature else False, |
|
|
"pad_token_id": tokenizer.eos_token_id, |
|
|
"eos_token_id": tokenizer.eos_token_id, |
|
|
"streamer": streamer, |
|
|
} |
|
|
|
|
|
if request.temperature and request.temperature > 0: |
|
|
gen_kwargs["temperature"] = min(request.temperature, 1.0) |
|
|
if request.top_p: |
|
|
gen_kwargs["top_p"] = request.top_p |
|
|
|
|
|
thread = Thread(target=model.generate, kwargs=gen_kwargs) |
|
|
thread.start() |
|
|
|
|
|
output_tokens = 0 |
|
|
for text in streamer: |
|
|
if text: |
|
|
output_tokens += len(tokenizer.encode(text, add_special_tokens=False)) |
|
|
chunk = { |
|
|
"id": chat_id, |
|
|
"object": "chat.completion.chunk", |
|
|
"created": created, |
|
|
"model": request.model, |
|
|
"choices": [{"index": 0, "delta": {"content": text}, "finish_reason": None}] |
|
|
} |
|
|
yield f"data: {json.dumps(chunk)}\n\n" |
|
|
|
|
|
thread.join() |
|
|
|
|
|
|
|
|
finish_reason = "length" if output_tokens >= (request.max_tokens or 1024) else "stop" |
|
|
final_chunk = { |
|
|
"id": chat_id, |
|
|
"object": "chat.completion.chunk", |
|
|
"created": created, |
|
|
"model": request.model, |
|
|
"choices": [{"index": 0, "delta": {}, "finish_reason": finish_reason}] |
|
|
} |
|
|
yield f"data: {json.dumps(final_chunk)}\n\n" |
|
|
yield "data: [DONE]\n\n" |
|
|
|
|
|
return StreamingResponse(generate(), media_type="text/event-stream", headers={"Cache-Control": "no-cache", "Connection": "keep-alive"}) |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
@app.get("/anthropic/v1/models") |
|
|
async def anthropic_list_models(): |
|
|
"""List models (Anthropic format)""" |
|
|
return { |
|
|
"object": "list", |
|
|
"data": [{ |
|
|
"id": "qwen2.5-coder-3b", |
|
|
"object": "model", |
|
|
"created": int(time.time()), |
|
|
"owned_by": "qwen", |
|
|
"display_name": "Qwen2.5 Coder 3B Instruct", |
|
|
"supports_thinking": True |
|
|
}] |
|
|
} |
|
|
|
|
|
@app.post("/anthropic/v1/messages", response_model=AnthropicMessageResponse) |
|
|
async def anthropic_create_message( |
|
|
request: AnthropicMessageRequest, |
|
|
x_api_key: Optional[str] = Header(None, alias="x-api-key"), |
|
|
anthropic_version: Optional[str] = Header(None, alias="anthropic-version"), |
|
|
anthropic_beta: Optional[str] = Header(None, alias="anthropic-beta") |
|
|
): |
|
|
"""Create message (Anthropic format with Extended Thinking)""" |
|
|
message_id = generate_id("msg") |
|
|
|
|
|
thinking_enabled = False |
|
|
budget_tokens = 1024 |
|
|
if request.thinking: |
|
|
thinking_enabled = request.thinking.type == "enabled" |
|
|
budget_tokens = request.thinking.budget_tokens or 1024 |
|
|
|
|
|
logger.info(f"[{message_id}] Anthropic msg - model: {request.model}, max_tokens: {request.max_tokens}, thinking: {thinking_enabled}") |
|
|
|
|
|
try: |
|
|
prompt = format_anthropic_messages(request.messages, request.system, thinking_enabled, budget_tokens) |
|
|
inputs = tokenizer(prompt, return_tensors="pt").to(DEVICE) |
|
|
input_token_count = inputs.input_ids.shape[1] |
|
|
|
|
|
if request.stream: |
|
|
return await anthropic_stream_response(request, inputs, input_token_count, message_id, thinking_enabled, budget_tokens) |
|
|
|
|
|
total_max_tokens = request.max_tokens + (budget_tokens if thinking_enabled else 0) |
|
|
|
|
|
gen_kwargs = { |
|
|
"max_new_tokens": total_max_tokens, |
|
|
"do_sample": request.temperature > 0 if request.temperature else False, |
|
|
"pad_token_id": tokenizer.eos_token_id, |
|
|
"eos_token_id": tokenizer.eos_token_id, |
|
|
} |
|
|
|
|
|
if request.temperature and request.temperature > 0: |
|
|
gen_kwargs["temperature"] = request.temperature |
|
|
if request.top_p: |
|
|
gen_kwargs["top_p"] = request.top_p |
|
|
if request.top_k: |
|
|
gen_kwargs["top_k"] = request.top_k |
|
|
|
|
|
gen_start = time.time() |
|
|
with torch.no_grad(): |
|
|
outputs = model.generate(**inputs, **gen_kwargs) |
|
|
gen_time = time.time() - gen_start |
|
|
|
|
|
generated_tokens = outputs[0][input_token_count:] |
|
|
generated_text = tokenizer.decode(generated_tokens, skip_special_tokens=True) |
|
|
output_token_count = len(generated_tokens) |
|
|
|
|
|
content_blocks = [] |
|
|
if thinking_enabled: |
|
|
thinking_text, answer_text = parse_thinking_response(generated_text) |
|
|
if thinking_text: |
|
|
content_blocks.append(AnthropicResponseThinkingBlock(type="thinking", thinking=thinking_text)) |
|
|
content_blocks.append(AnthropicResponseTextBlock(type="text", text=answer_text)) |
|
|
else: |
|
|
content_blocks.append(AnthropicResponseTextBlock(type="text", text=generated_text.strip())) |
|
|
|
|
|
stop_reason = "end_turn" |
|
|
if output_token_count >= total_max_tokens: |
|
|
stop_reason = "max_tokens" |
|
|
|
|
|
logger.info(f"[{message_id}] Generated {output_token_count} tokens in {gen_time:.2f}s") |
|
|
|
|
|
return AnthropicMessageResponse( |
|
|
id=message_id, |
|
|
content=content_blocks, |
|
|
model=request.model, |
|
|
stop_reason=stop_reason, |
|
|
usage=AnthropicUsage(input_tokens=input_token_count, output_tokens=output_token_count) |
|
|
) |
|
|
|
|
|
except Exception as e: |
|
|
logger.error(f"[{message_id}] Error: {e}", exc_info=True) |
|
|
raise HTTPException(status_code=500, detail=str(e)) |
|
|
|
|
|
async def anthropic_stream_response(request: AnthropicMessageRequest, inputs, input_token_count: int, message_id: str, thinking_enabled: bool, budget_tokens: int): |
|
|
"""Stream response in Anthropic format""" |
|
|
|
|
|
async def generate(): |
|
|
start_event = { |
|
|
"type": "message_start", |
|
|
"message": { |
|
|
"id": message_id, "type": "message", "role": "assistant", "content": [], |
|
|
"model": request.model, "stop_reason": None, "stop_sequence": None, |
|
|
"usage": {"input_tokens": input_token_count, "output_tokens": 0} |
|
|
} |
|
|
} |
|
|
yield f"event: message_start\ndata: {json.dumps(start_event)}\n\n" |
|
|
yield f"event: ping\ndata: {json.dumps({'type': 'ping'})}\n\n" |
|
|
|
|
|
block_index = 0 |
|
|
in_thinking = False |
|
|
thinking_started = False |
|
|
text_block_started = False |
|
|
|
|
|
streamer = TextIteratorStreamer(tokenizer, skip_prompt=True, skip_special_tokens=True) |
|
|
total_max_tokens = request.max_tokens + (budget_tokens if thinking_enabled else 0) |
|
|
|
|
|
gen_kwargs = { |
|
|
**inputs, |
|
|
"max_new_tokens": total_max_tokens, |
|
|
"do_sample": request.temperature > 0 if request.temperature else False, |
|
|
"pad_token_id": tokenizer.eos_token_id, |
|
|
"eos_token_id": tokenizer.eos_token_id, |
|
|
"streamer": streamer, |
|
|
} |
|
|
|
|
|
if request.temperature and request.temperature > 0: |
|
|
gen_kwargs["temperature"] = request.temperature |
|
|
if request.top_p: |
|
|
gen_kwargs["top_p"] = request.top_p |
|
|
if request.top_k: |
|
|
gen_kwargs["top_k"] = request.top_k |
|
|
|
|
|
thread = Thread(target=model.generate, kwargs=gen_kwargs) |
|
|
thread.start() |
|
|
|
|
|
output_tokens = 0 |
|
|
accumulated_text = "" |
|
|
|
|
|
for text in streamer: |
|
|
if text: |
|
|
output_tokens += len(tokenizer.encode(text, add_special_tokens=False)) |
|
|
accumulated_text += text |
|
|
|
|
|
if thinking_enabled: |
|
|
if "<thinking>" in accumulated_text and not thinking_started: |
|
|
thinking_started = True |
|
|
in_thinking = True |
|
|
yield f"event: content_block_start\ndata: {json.dumps({'type': 'content_block_start', 'index': block_index, 'content_block': {'type': 'thinking', 'thinking': ''}})}\n\n" |
|
|
|
|
|
if in_thinking: |
|
|
clean_text = text.replace("<thinking>", "").replace("</thinking>", "") |
|
|
if clean_text: |
|
|
yield f"event: content_block_delta\ndata: {json.dumps({'type': 'content_block_delta', 'index': block_index, 'delta': {'type': 'thinking_delta', 'thinking': clean_text}})}\n\n" |
|
|
if "</thinking>" in accumulated_text: |
|
|
in_thinking = False |
|
|
yield f"event: content_block_stop\ndata: {json.dumps({'type': 'content_block_stop', 'index': block_index})}\n\n" |
|
|
block_index += 1 |
|
|
text_block_started = True |
|
|
yield f"event: content_block_start\ndata: {json.dumps({'type': 'content_block_start', 'index': block_index, 'content_block': {'type': 'text', 'text': ''}})}\n\n" |
|
|
elif text_block_started: |
|
|
yield f"event: content_block_delta\ndata: {json.dumps({'type': 'content_block_delta', 'index': block_index, 'delta': {'type': 'text_delta', 'text': text}})}\n\n" |
|
|
else: |
|
|
if not text_block_started: |
|
|
text_block_started = True |
|
|
yield f"event: content_block_start\ndata: {json.dumps({'type': 'content_block_start', 'index': 0, 'content_block': {'type': 'text', 'text': ''}})}\n\n" |
|
|
yield f"event: content_block_delta\ndata: {json.dumps({'type': 'content_block_delta', 'index': 0, 'delta': {'type': 'text_delta', 'text': text}})}\n\n" |
|
|
|
|
|
thread.join() |
|
|
|
|
|
yield f"event: content_block_stop\ndata: {json.dumps({'type': 'content_block_stop', 'index': block_index})}\n\n" |
|
|
|
|
|
stop_reason = "max_tokens" if output_tokens >= total_max_tokens else "end_turn" |
|
|
yield f"event: message_delta\ndata: {json.dumps({'type': 'message_delta', 'delta': {'stop_reason': stop_reason}, 'usage': {'output_tokens': output_tokens}})}\n\n" |
|
|
yield f"event: message_stop\ndata: {json.dumps({'type': 'message_stop'})}\n\n" |
|
|
|
|
|
return StreamingResponse(generate(), media_type="text/event-stream", headers={"Cache-Control": "no-cache", "Connection": "keep-alive", "X-Accel-Buffering": "no"}) |
|
|
|
|
|
@app.post("/anthropic/v1/messages/count_tokens", response_model=AnthropicTokenCountResponse) |
|
|
async def anthropic_count_tokens(request: AnthropicTokenCountRequest): |
|
|
thinking_enabled = request.thinking and request.thinking.type == "enabled" |
|
|
budget_tokens = request.thinking.budget_tokens if request.thinking else 1024 |
|
|
prompt = format_anthropic_messages(request.messages, request.system, thinking_enabled, budget_tokens) |
|
|
tokens = tokenizer.encode(prompt) |
|
|
return AnthropicTokenCountResponse(input_tokens=len(tokens)) |
|
|
|
|
|
if __name__ == "__main__": |
|
|
import uvicorn |
|
|
uvicorn.run(app, host="0.0.0.0", port=7860, log_config=None) |
|
|
|