File size: 4,107 Bytes
c50496f | 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 | """
统一的健康检查(Hi消息)处理模块
提供对OpenAI、Gemini和Anthropic格式的Hi消息的解析和响应
"""
import time
from typing import Any, Dict, List
# ==================== Hi消息检测 ====================
def is_health_check_request(request_data: dict, format: str = "openai") -> bool:
"""
检查是否是健康检查请求(Hi消息)
Args:
request_data: 请求数据
format: 请求格式("openai"、"gemini" 或 "anthropic")
Returns:
是否是健康检查请求
"""
if format == "openai":
# OpenAI格式健康检查: {"messages": [{"role": "user", "content": "Hi"}]}
messages = request_data.get("messages", [])
if len(messages) == 1:
msg = messages[0]
if msg.get("role") == "user" and msg.get("content") == "Hi":
return True
elif format == "gemini":
# Gemini格式健康检查: {"contents": [{"role": "user", "parts": [{"text": "Hi"}]}]}
contents = request_data.get("contents", [])
if len(contents) == 1:
content = contents[0]
if (content.get("role") == "user" and
content.get("parts", [{}])[0].get("text") == "Hi"):
return True
elif format == "anthropic":
# Anthropic格式健康检查: {"messages": [{"role": "user", "content": "Hi"}]}
messages = request_data.get("messages", [])
if (len(messages) == 1
and messages[0].get("role") == "user"
and messages[0].get("content") == "Hi"):
return True
return False
def is_health_check_message(messages: List[Dict[str, Any]]) -> bool:
"""
直接检查消息列表是否为健康检查消息(Anthropic专用)
这是一个便捷函数,用于已经提取出消息列表的场景。
Args:
messages: 消息列表
Returns:
是否为健康检查消息
"""
return (
len(messages) == 1
and messages[0].get("role") == "user"
and messages[0].get("content") == "Hi"
)
# ==================== Hi消息响应生成 ====================
def create_health_check_response(format: str = "openai", **kwargs) -> dict:
"""
创建健康检查响应
Args:
format: 响应格式("openai"、"gemini" 或 "anthropic")
**kwargs: 格式特定的额外参数
- model: 模型名称(anthropic格式需要)
- message_id: 消息ID(anthropic格式需要)
Returns:
健康检查响应字典
"""
if format == "openai":
# OpenAI格式响应
return {
"id": "healthcheck",
"object": "chat.completion",
"created": int(time.time()),
"model": "healthcheck",
"choices": [{
"index": 0,
"message": {
"role": "assistant",
"content": "API is working"
},
"finish_reason": "stop"
}]
}
elif format == "gemini":
# Gemini格式响应
return {
"candidates": [{
"content": {
"parts": [{"text": "gcli2api工作中"}],
"role": "model"
},
"finishReason": "STOP",
"index": 0,
}]
}
elif format == "anthropic":
# Anthropic格式响应
model = kwargs.get("model", "claude-unknown")
message_id = kwargs.get("message_id", "msg_healthcheck")
return {
"id": message_id,
"type": "message",
"role": "assistant",
"model": str(model),
"content": [{"type": "text", "text": "antigravity Anthropic Messages 正常工作中"}],
"stop_reason": "end_turn",
"stop_sequence": None,
"usage": {"input_tokens": 0, "output_tokens": 0},
}
# 未知格式返回空字典
return {}
|