""" Dual-Compatible API Endpoint (OpenAI + Anthropic) Lightweight CPU-based implementation for Hugging Face Spaces - OpenAI format: /v1/chat/completions - Anthropic format: /anthropic/v1/messages """ import os import time import uuid import logging import re from datetime import datetime from logging.handlers import RotatingFileHandler from typing import List, Optional, Union, Dict, Any, Literal from contextlib import asynccontextmanager from fastapi import FastAPI, HTTPException, Header, Request from fastapi.responses import StreamingResponse, JSONResponse from fastapi.middleware.cors import CORSMiddleware from pydantic import BaseModel, Field import torch from transformers import AutoModelForCausalLM, AutoTokenizer, TextIteratorStreamer from threading import Thread import json # ============== Logging Configuration ============== LOG_DIR = "/tmp/logs" os.makedirs(LOG_DIR, exist_ok=True) LOG_FILE = os.path.join(LOG_DIR, "api.log") log_format = logging.Formatter( '%(asctime)s | %(levelname)-8s | %(name)s | %(message)s', datefmt='%Y-%m-%d %H:%M:%S' ) file_handler = RotatingFileHandler( LOG_FILE, maxBytes=10*1024*1024, backupCount=5, encoding='utf-8' ) file_handler.setFormatter(log_format) file_handler.setLevel(logging.DEBUG) console_handler = logging.StreamHandler() console_handler.setFormatter(log_format) console_handler.setLevel(logging.INFO) logging.basicConfig(level=logging.DEBUG, handlers=[file_handler, console_handler]) logger = logging.getLogger("dual-api") for uvicorn_logger in ["uvicorn", "uvicorn.error", "uvicorn.access"]: uv_log = logging.getLogger(uvicorn_logger) uv_log.handlers = [file_handler, console_handler] logger.info("=" * 60) logger.info(f"Dual API (OpenAI + Anthropic) Startup at {datetime.now().isoformat()}") logger.info(f"Log file: {LOG_FILE}") logger.info("=" * 60) # ============== Configuration ============== MODEL_ID = "Qwen/Qwen2.5-Coder-3B-Instruct" DEVICE = "cpu" model = None tokenizer = None @asynccontextmanager async def lifespan(app: FastAPI): global model, tokenizer logger.info(f"Loading model: {MODEL_ID}") try: tokenizer = AutoTokenizer.from_pretrained(MODEL_ID) logger.info("Tokenizer loaded successfully") model = AutoModelForCausalLM.from_pretrained( MODEL_ID, torch_dtype=torch.float32, device_map=DEVICE, low_cpu_mem_usage=True ) model.eval() logger.info("Model loaded successfully!") logger.info(f"Model parameters: {sum(p.numel() for p in model.parameters()):,}") except Exception as e: logger.error(f"Failed to load model: {e}", exc_info=True) raise yield logger.info("Shutting down, cleaning up model...") del model, tokenizer app = FastAPI( title="Dual-Compatible API (OpenAI + Anthropic)", description=""" Lightweight CPU-based API with dual compatibility: - OpenAI format: /v1/chat/completions - Anthropic format: /anthropic/v1/messages """, version="1.0.0", lifespan=lifespan ) app.add_middleware( CORSMiddleware, allow_origins=["*"], allow_credentials=True, allow_methods=["*"], allow_headers=["*"], ) @app.middleware("http") async def log_requests(request: Request, call_next): request_id = str(uuid.uuid4())[:8] start_time = time.time() logger.info(f"[{request_id}] {request.method} {request.url.path} - Started") try: response = await call_next(request) duration = (time.time() - start_time) * 1000 logger.info(f"[{request_id}] {request.method} {request.url.path} - {response.status_code} ({duration:.2f}ms)") return response except Exception as e: duration = (time.time() - start_time) * 1000 logger.error(f"[{request_id}] {request.method} {request.url.path} - Error: {e} ({duration:.2f}ms)") raise # ============================================================ # ANTHROPIC-COMPATIBLE MODELS (under /anthropic) # ============================================================ class AnthropicTextBlock(BaseModel): type: Literal["text"] = "text" text: str class AnthropicImageSource(BaseModel): type: Literal["base64", "url"] = "base64" media_type: Optional[str] = None data: Optional[str] = None url: Optional[str] = None class AnthropicImageBlock(BaseModel): type: Literal["image"] = "image" source: AnthropicImageSource class AnthropicToolUseBlock(BaseModel): type: Literal["tool_use"] = "tool_use" id: str name: str input: Dict[str, Any] class AnthropicToolResultBlock(BaseModel): type: Literal["tool_result"] = "tool_result" tool_use_id: str content: Optional[Union[str, List[AnthropicTextBlock]]] = None is_error: Optional[bool] = False AnthropicContentBlock = Union[AnthropicTextBlock, AnthropicImageBlock, AnthropicToolUseBlock, AnthropicToolResultBlock] class AnthropicMessage(BaseModel): role: Literal["user", "assistant"] content: Union[str, List[AnthropicContentBlock]] class AnthropicToolInputSchema(BaseModel): type: Literal["object"] = "object" properties: Optional[Dict[str, Any]] = None required: Optional[List[str]] = None class AnthropicTool(BaseModel): name: str description: Optional[str] = None input_schema: AnthropicToolInputSchema class AnthropicToolChoiceAuto(BaseModel): type: Literal["auto"] = "auto" disable_parallel_tool_use: Optional[bool] = None class AnthropicToolChoiceAny(BaseModel): type: Literal["any"] = "any" disable_parallel_tool_use: Optional[bool] = None class AnthropicToolChoiceTool(BaseModel): type: Literal["tool"] = "tool" name: str disable_parallel_tool_use: Optional[bool] = None AnthropicToolChoice = Union[AnthropicToolChoiceAuto, AnthropicToolChoiceAny, AnthropicToolChoiceTool] class AnthropicMetadata(BaseModel): user_id: Optional[str] = None class AnthropicSystemContent(BaseModel): type: Literal["text"] = "text" text: str cache_control: Optional[Dict[str, str]] = None class AnthropicThinkingConfig(BaseModel): type: Literal["enabled", "disabled"] = "enabled" budget_tokens: Optional[int] = Field(default=1024, ge=1, le=128000) class AnthropicMessageRequest(BaseModel): model: str max_tokens: int messages: List[AnthropicMessage] metadata: Optional[AnthropicMetadata] = None stop_sequences: Optional[List[str]] = None stream: Optional[bool] = False system: Optional[Union[str, List[AnthropicSystemContent]]] = None temperature: Optional[float] = Field(default=1.0, ge=0.0, le=1.0) tool_choice: Optional[AnthropicToolChoice] = None tools: Optional[List[AnthropicTool]] = None top_k: Optional[int] = Field(default=None, ge=0) top_p: Optional[float] = Field(default=None, ge=0.0, le=1.0) thinking: Optional[AnthropicThinkingConfig] = None class AnthropicUsage(BaseModel): input_tokens: int output_tokens: int cache_creation_input_tokens: Optional[int] = None cache_read_input_tokens: Optional[int] = None class AnthropicResponseTextBlock(BaseModel): type: Literal["text"] = "text" text: str class AnthropicResponseThinkingBlock(BaseModel): type: Literal["thinking"] = "thinking" thinking: str class AnthropicResponseToolUseBlock(BaseModel): type: Literal["tool_use"] = "tool_use" id: str name: str input: Dict[str, Any] AnthropicResponseContentBlock = Union[AnthropicResponseTextBlock, AnthropicResponseThinkingBlock, AnthropicResponseToolUseBlock] class AnthropicMessageResponse(BaseModel): id: str type: Literal["message"] = "message" role: Literal["assistant"] = "assistant" content: List[AnthropicResponseContentBlock] model: str stop_reason: Optional[Literal["end_turn", "max_tokens", "stop_sequence", "tool_use"]] = None stop_sequence: Optional[str] = None usage: AnthropicUsage class AnthropicTokenCountRequest(BaseModel): model: str messages: List[AnthropicMessage] system: Optional[Union[str, List[AnthropicSystemContent]]] = None tools: Optional[List[AnthropicTool]] = None thinking: Optional[AnthropicThinkingConfig] = None class AnthropicTokenCountResponse(BaseModel): input_tokens: int # ============================================================ # OPENAI-COMPATIBLE MODELS (under /v1) # ============================================================ class OpenAIMessage(BaseModel): role: Literal["system", "user", "assistant", "tool"] content: Optional[Union[str, List[Dict[str, Any]]]] = None name: Optional[str] = None tool_calls: Optional[List[Dict[str, Any]]] = None tool_call_id: Optional[str] = None class OpenAITool(BaseModel): type: Literal["function"] = "function" function: Dict[str, Any] class OpenAIToolChoice(BaseModel): type: str function: Optional[Dict[str, str]] = None class OpenAIChatRequest(BaseModel): model: str messages: List[OpenAIMessage] max_tokens: Optional[int] = 1024 temperature: Optional[float] = Field(default=1.0, ge=0.0, le=2.0) top_p: Optional[float] = Field(default=1.0, ge=0.0, le=1.0) n: Optional[int] = 1 stream: Optional[bool] = False stop: Optional[Union[str, List[str]]] = None presence_penalty: Optional[float] = 0.0 frequency_penalty: Optional[float] = 0.0 logit_bias: Optional[Dict[str, float]] = None user: Optional[str] = None tools: Optional[List[OpenAITool]] = None tool_choice: Optional[Union[str, OpenAIToolChoice]] = None seed: Optional[int] = None class OpenAIUsage(BaseModel): prompt_tokens: int completion_tokens: int total_tokens: int class OpenAIChoice(BaseModel): index: int message: Dict[str, Any] finish_reason: Optional[str] = None class OpenAIChatResponse(BaseModel): id: str object: Literal["chat.completion"] = "chat.completion" created: int model: str choices: List[OpenAIChoice] usage: OpenAIUsage system_fingerprint: Optional[str] = None class OpenAIStreamChoice(BaseModel): index: int delta: Dict[str, Any] finish_reason: Optional[str] = None class OpenAIStreamResponse(BaseModel): id: str object: Literal["chat.completion.chunk"] = "chat.completion.chunk" created: int model: str choices: List[OpenAIStreamChoice] class OpenAIModel(BaseModel): id: str object: Literal["model"] = "model" created: int owned_by: str class OpenAIModelList(BaseModel): object: Literal["list"] = "list" data: List[OpenAIModel] # ============== Helper Functions ============== def extract_anthropic_text(content: Union[str, List[AnthropicContentBlock]]) -> str: if isinstance(content, str): return content texts = [] for block in content: if isinstance(block, dict): if block.get("type") == "text": texts.append(block.get("text", "")) elif hasattr(block, "type") and block.type == "text": texts.append(block.text) return " ".join(texts) def extract_anthropic_system(system: Optional[Union[str, List[AnthropicSystemContent]]]) -> Optional[str]: if system is None: return None if isinstance(system, str): return system texts = [] for block in system: if isinstance(block, dict): texts.append(block.get("text", "")) elif hasattr(block, "text"): texts.append(block.text) return " ".join(texts) def extract_openai_content(content: Optional[Union[str, List[Dict[str, Any]]]]) -> str: if content is None: return "" if isinstance(content, str): return content texts = [] for item in content: if isinstance(item, dict) and item.get("type") == "text": texts.append(item.get("text", "")) return " ".join(texts) def format_anthropic_messages( messages: List[AnthropicMessage], system: Optional[Union[str, List[AnthropicSystemContent]]] = None, thinking_enabled: bool = False, budget_tokens: int = 1024 ) -> str: formatted_messages = [] system_text = extract_anthropic_system(system) if thinking_enabled: thinking_instruction = f"""You are a helpful AI assistant with extended thinking capabilities. When responding to complex problems: 1. First, think through the problem step by step inside ... tags 2. Consider multiple approaches and evaluate them 3. Show your reasoning process clearly 4. After thinking, provide your final answer outside the thinking tags Budget for thinking: up to {budget_tokens} tokens for reasoning. Think deeply and thoroughly before responding.""" if system_text: system_text = f"{thinking_instruction}\n\n{system_text}" else: system_text = thinking_instruction if system_text: formatted_messages.append({"role": "system", "content": system_text}) for msg in messages: content = extract_anthropic_text(msg.content) formatted_messages.append({"role": msg.role, "content": content}) if tokenizer.chat_template: return tokenizer.apply_chat_template(formatted_messages, tokenize=False, add_generation_prompt=True) prompt = "" for msg in formatted_messages: role = msg["role"].capitalize() prompt += f"{role}: {msg['content']}\n" prompt += "Assistant: " return prompt def format_openai_messages(messages: List[OpenAIMessage]) -> str: formatted_messages = [] for msg in messages: content = extract_openai_content(msg.content) formatted_messages.append({"role": msg.role, "content": content}) if tokenizer.chat_template: return tokenizer.apply_chat_template(formatted_messages, tokenize=False, add_generation_prompt=True) prompt = "" for msg in formatted_messages: role = msg["role"].capitalize() prompt += f"{role}: {msg['content']}\n" prompt += "Assistant: " return prompt def parse_thinking_response(text: str) -> tuple: thinking_pattern = r'(.*?)' thinking_matches = re.findall(thinking_pattern, text, re.DOTALL) if thinking_matches: thinking_text = "\n".join(thinking_matches).strip() answer_text = re.sub(thinking_pattern, '', text, flags=re.DOTALL).strip() return thinking_text, answer_text return None, text.strip() def generate_id(prefix: str = "msg") -> str: return f"{prefix}_{uuid.uuid4().hex[:24]}" # ============== ROOT ENDPOINTS ============== @app.get("/") async def root(): return { "status": "healthy", "model": MODEL_ID, "endpoints": { "openai": "/v1/chat/completions", "anthropic": "/anthropic/v1/messages" }, "base_urls": { "openai_sdk": "https://likhonsheikh-anthropic-compatible-api.hf.space/v1", "anthropic_sdk": "https://likhonsheikh-anthropic-compatible-api.hf.space/anthropic" }, "features": ["extended-thinking", "streaming", "dual-compatibility"], "log_file": LOG_FILE } @app.get("/logs") async def get_logs(lines: int = 100): try: with open(LOG_FILE, 'r') as f: all_lines = f.readlines() recent_lines = all_lines[-lines:] if len(all_lines) > lines else all_lines return {"log_file": LOG_FILE, "total_lines": len(all_lines), "returned_lines": len(recent_lines), "logs": "".join(recent_lines)} except FileNotFoundError: return {"error": "Log file not found", "log_file": LOG_FILE} @app.get("/health") async def health(): return {"status": "ok", "model_loaded": model is not None, "log_file": LOG_FILE, "features": ["openai-compatible", "anthropic-compatible", "extended-thinking"]} # ============================================================ # OPENAI-COMPATIBLE ENDPOINTS (/v1) # ============================================================ @app.get("/v1/models") async def openai_list_models(): """List models (OpenAI format)""" return OpenAIModelList( data=[OpenAIModel(id="qwen2.5-coder-3b", created=int(time.time()), owned_by="qwen")] ) @app.post("/v1/chat/completions") async def openai_chat_completions( request: OpenAIChatRequest, authorization: Optional[str] = Header(None) ): """Chat completions (OpenAI format)""" chat_id = generate_id("chatcmpl") logger.info(f"[{chat_id}] OpenAI chat - model: {request.model}, max_tokens: {request.max_tokens}, stream: {request.stream}") try: prompt = format_openai_messages(request.messages) inputs = tokenizer(prompt, return_tensors="pt").to(DEVICE) input_token_count = inputs.input_ids.shape[1] if request.stream: return await openai_stream_response(request, inputs, input_token_count, chat_id) gen_kwargs = { "max_new_tokens": request.max_tokens or 1024, "do_sample": request.temperature > 0 if request.temperature else False, "pad_token_id": tokenizer.eos_token_id, "eos_token_id": tokenizer.eos_token_id, } if request.temperature and request.temperature > 0: gen_kwargs["temperature"] = min(request.temperature, 1.0) if request.top_p: gen_kwargs["top_p"] = request.top_p if request.stop: stop_seqs = [request.stop] if isinstance(request.stop, str) else request.stop stop_ids = [] for seq in stop_seqs: tokens = tokenizer.encode(seq, add_special_tokens=False) if tokens: stop_ids.extend(tokens) if stop_ids: gen_kwargs["eos_token_id"] = list(set([tokenizer.eos_token_id] + stop_ids)) gen_start = time.time() with torch.no_grad(): outputs = model.generate(**inputs, **gen_kwargs) gen_time = time.time() - gen_start generated_tokens = outputs[0][input_token_count:] generated_text = tokenizer.decode(generated_tokens, skip_special_tokens=True) output_token_count = len(generated_tokens) finish_reason = "stop" if output_token_count >= (request.max_tokens or 1024): finish_reason = "length" logger.info(f"[{chat_id}] Generated {output_token_count} tokens in {gen_time:.2f}s") return OpenAIChatResponse( id=chat_id, created=int(time.time()), model=request.model, choices=[OpenAIChoice( index=0, message={"role": "assistant", "content": generated_text.strip()}, finish_reason=finish_reason )], usage=OpenAIUsage( prompt_tokens=input_token_count, completion_tokens=output_token_count, total_tokens=input_token_count + output_token_count ) ) except Exception as e: logger.error(f"[{chat_id}] Error: {e}", exc_info=True) raise HTTPException(status_code=500, detail=str(e)) async def openai_stream_response(request: OpenAIChatRequest, inputs, input_token_count: int, chat_id: str): """Stream response in OpenAI format""" async def generate(): created = int(time.time()) # Initial chunk with role initial_chunk = { "id": chat_id, "object": "chat.completion.chunk", "created": created, "model": request.model, "choices": [{"index": 0, "delta": {"role": "assistant", "content": ""}, "finish_reason": None}] } yield f"data: {json.dumps(initial_chunk)}\n\n" streamer = TextIteratorStreamer(tokenizer, skip_prompt=True, skip_special_tokens=True) gen_kwargs = { **inputs, "max_new_tokens": request.max_tokens or 1024, "do_sample": request.temperature > 0 if request.temperature else False, "pad_token_id": tokenizer.eos_token_id, "eos_token_id": tokenizer.eos_token_id, "streamer": streamer, } if request.temperature and request.temperature > 0: gen_kwargs["temperature"] = min(request.temperature, 1.0) if request.top_p: gen_kwargs["top_p"] = request.top_p thread = Thread(target=model.generate, kwargs=gen_kwargs) thread.start() output_tokens = 0 for text in streamer: if text: output_tokens += len(tokenizer.encode(text, add_special_tokens=False)) chunk = { "id": chat_id, "object": "chat.completion.chunk", "created": created, "model": request.model, "choices": [{"index": 0, "delta": {"content": text}, "finish_reason": None}] } yield f"data: {json.dumps(chunk)}\n\n" thread.join() # Final chunk finish_reason = "length" if output_tokens >= (request.max_tokens or 1024) else "stop" final_chunk = { "id": chat_id, "object": "chat.completion.chunk", "created": created, "model": request.model, "choices": [{"index": 0, "delta": {}, "finish_reason": finish_reason}] } yield f"data: {json.dumps(final_chunk)}\n\n" yield "data: [DONE]\n\n" return StreamingResponse(generate(), media_type="text/event-stream", headers={"Cache-Control": "no-cache", "Connection": "keep-alive"}) # ============================================================ # ANTHROPIC-COMPATIBLE ENDPOINTS (/anthropic) # ============================================================ @app.get("/anthropic/v1/models") async def anthropic_list_models(): """List models (Anthropic format)""" return { "object": "list", "data": [{ "id": "qwen2.5-coder-3b", "object": "model", "created": int(time.time()), "owned_by": "qwen", "display_name": "Qwen2.5 Coder 3B Instruct", "supports_thinking": True }] } @app.post("/anthropic/v1/messages", response_model=AnthropicMessageResponse) async def anthropic_create_message( request: AnthropicMessageRequest, x_api_key: Optional[str] = Header(None, alias="x-api-key"), anthropic_version: Optional[str] = Header(None, alias="anthropic-version"), anthropic_beta: Optional[str] = Header(None, alias="anthropic-beta") ): """Create message (Anthropic format with Extended Thinking)""" message_id = generate_id("msg") thinking_enabled = False budget_tokens = 1024 if request.thinking: thinking_enabled = request.thinking.type == "enabled" budget_tokens = request.thinking.budget_tokens or 1024 logger.info(f"[{message_id}] Anthropic msg - model: {request.model}, max_tokens: {request.max_tokens}, thinking: {thinking_enabled}") try: prompt = format_anthropic_messages(request.messages, request.system, thinking_enabled, budget_tokens) inputs = tokenizer(prompt, return_tensors="pt").to(DEVICE) input_token_count = inputs.input_ids.shape[1] if request.stream: return await anthropic_stream_response(request, inputs, input_token_count, message_id, thinking_enabled, budget_tokens) total_max_tokens = request.max_tokens + (budget_tokens if thinking_enabled else 0) gen_kwargs = { "max_new_tokens": total_max_tokens, "do_sample": request.temperature > 0 if request.temperature else False, "pad_token_id": tokenizer.eos_token_id, "eos_token_id": tokenizer.eos_token_id, } if request.temperature and request.temperature > 0: gen_kwargs["temperature"] = request.temperature if request.top_p: gen_kwargs["top_p"] = request.top_p if request.top_k: gen_kwargs["top_k"] = request.top_k gen_start = time.time() with torch.no_grad(): outputs = model.generate(**inputs, **gen_kwargs) gen_time = time.time() - gen_start generated_tokens = outputs[0][input_token_count:] generated_text = tokenizer.decode(generated_tokens, skip_special_tokens=True) output_token_count = len(generated_tokens) content_blocks = [] if thinking_enabled: thinking_text, answer_text = parse_thinking_response(generated_text) if thinking_text: content_blocks.append(AnthropicResponseThinkingBlock(type="thinking", thinking=thinking_text)) content_blocks.append(AnthropicResponseTextBlock(type="text", text=answer_text)) else: content_blocks.append(AnthropicResponseTextBlock(type="text", text=generated_text.strip())) stop_reason = "end_turn" if output_token_count >= total_max_tokens: stop_reason = "max_tokens" logger.info(f"[{message_id}] Generated {output_token_count} tokens in {gen_time:.2f}s") return AnthropicMessageResponse( id=message_id, content=content_blocks, model=request.model, stop_reason=stop_reason, usage=AnthropicUsage(input_tokens=input_token_count, output_tokens=output_token_count) ) except Exception as e: logger.error(f"[{message_id}] Error: {e}", exc_info=True) raise HTTPException(status_code=500, detail=str(e)) async def anthropic_stream_response(request: AnthropicMessageRequest, inputs, input_token_count: int, message_id: str, thinking_enabled: bool, budget_tokens: int): """Stream response in Anthropic format""" async def generate(): start_event = { "type": "message_start", "message": { "id": message_id, "type": "message", "role": "assistant", "content": [], "model": request.model, "stop_reason": None, "stop_sequence": None, "usage": {"input_tokens": input_token_count, "output_tokens": 0} } } yield f"event: message_start\ndata: {json.dumps(start_event)}\n\n" yield f"event: ping\ndata: {json.dumps({'type': 'ping'})}\n\n" block_index = 0 in_thinking = False thinking_started = False text_block_started = False streamer = TextIteratorStreamer(tokenizer, skip_prompt=True, skip_special_tokens=True) total_max_tokens = request.max_tokens + (budget_tokens if thinking_enabled else 0) gen_kwargs = { **inputs, "max_new_tokens": total_max_tokens, "do_sample": request.temperature > 0 if request.temperature else False, "pad_token_id": tokenizer.eos_token_id, "eos_token_id": tokenizer.eos_token_id, "streamer": streamer, } if request.temperature and request.temperature > 0: gen_kwargs["temperature"] = request.temperature if request.top_p: gen_kwargs["top_p"] = request.top_p if request.top_k: gen_kwargs["top_k"] = request.top_k thread = Thread(target=model.generate, kwargs=gen_kwargs) thread.start() output_tokens = 0 accumulated_text = "" for text in streamer: if text: output_tokens += len(tokenizer.encode(text, add_special_tokens=False)) accumulated_text += text if thinking_enabled: if "" in accumulated_text and not thinking_started: thinking_started = True in_thinking = True yield f"event: content_block_start\ndata: {json.dumps({'type': 'content_block_start', 'index': block_index, 'content_block': {'type': 'thinking', 'thinking': ''}})}\n\n" if in_thinking: clean_text = text.replace("", "").replace("", "") if clean_text: yield f"event: content_block_delta\ndata: {json.dumps({'type': 'content_block_delta', 'index': block_index, 'delta': {'type': 'thinking_delta', 'thinking': clean_text}})}\n\n" if "" in accumulated_text: in_thinking = False yield f"event: content_block_stop\ndata: {json.dumps({'type': 'content_block_stop', 'index': block_index})}\n\n" block_index += 1 text_block_started = True yield f"event: content_block_start\ndata: {json.dumps({'type': 'content_block_start', 'index': block_index, 'content_block': {'type': 'text', 'text': ''}})}\n\n" elif text_block_started: yield f"event: content_block_delta\ndata: {json.dumps({'type': 'content_block_delta', 'index': block_index, 'delta': {'type': 'text_delta', 'text': text}})}\n\n" else: if not text_block_started: text_block_started = True yield f"event: content_block_start\ndata: {json.dumps({'type': 'content_block_start', 'index': 0, 'content_block': {'type': 'text', 'text': ''}})}\n\n" yield f"event: content_block_delta\ndata: {json.dumps({'type': 'content_block_delta', 'index': 0, 'delta': {'type': 'text_delta', 'text': text}})}\n\n" thread.join() yield f"event: content_block_stop\ndata: {json.dumps({'type': 'content_block_stop', 'index': block_index})}\n\n" stop_reason = "max_tokens" if output_tokens >= total_max_tokens else "end_turn" yield f"event: message_delta\ndata: {json.dumps({'type': 'message_delta', 'delta': {'stop_reason': stop_reason}, 'usage': {'output_tokens': output_tokens}})}\n\n" yield f"event: message_stop\ndata: {json.dumps({'type': 'message_stop'})}\n\n" return StreamingResponse(generate(), media_type="text/event-stream", headers={"Cache-Control": "no-cache", "Connection": "keep-alive", "X-Accel-Buffering": "no"}) @app.post("/anthropic/v1/messages/count_tokens", response_model=AnthropicTokenCountResponse) async def anthropic_count_tokens(request: AnthropicTokenCountRequest): thinking_enabled = request.thinking and request.thinking.type == "enabled" budget_tokens = request.thinking.budget_tokens if request.thinking else 1024 prompt = format_anthropic_messages(request.messages, request.system, thinking_enabled, budget_tokens) tokens = tokenizer.encode(prompt) return AnthropicTokenCountResponse(input_tokens=len(tokens)) if __name__ == "__main__": import uvicorn uvicorn.run(app, host="0.0.0.0", port=7860, log_config=None)