test-w / api_service.py
letterm's picture
Update api_service.py
ae385fc verified
raw
history blame
12.1 kB
"""
API服务模块
处理所有API请求逻辑,包括聊天完成和Token管理
"""
import json
import time
from datetime import datetime
from typing import List, Dict, Any, Generator
from loguru import logger
from config import Config
from utils import Utils
from token_manager import MultiTokenManager
from warp_client import WarpClient
from request_converter import RequestConverter
from model_mapper import ModelMapper
class ApiService:
"""API服务类,处理所有业务逻辑"""
def __init__(self):
# 初始化Token管理器
self.token_manager = MultiTokenManager()
# 初始化Warp客户端
self.warp_client = WarpClient(self.token_manager)
logger.info("🚀 ApiService初始化完成")
def authenticate_request(self, auth_header: str) -> bool:
"""验证API请求"""
if not auth_header:
return False
token = Utils.extract_bearer_token(auth_header)
if not token:
return False
return Utils.validate_api_key(token)
def get_models(self) -> Dict[str, Any]:
"""获取支持的模型列表"""
models = []
for model_name in ModelMapper.get_available_models():
models.append({
"id": model_name,
"object": "model",
"created": Utils.get_current_timestamp(),
"owned_by": "warp-proxy"
})
return {
"object": "list",
"data": models
}
def chat_completion(self, request_data: Dict[str, Any], stream: bool = False) -> Generator[str, None, None]:
"""处理聊天完成请求"""
request_id = Utils.generate_request_id()
# 解析请求
openai_request = RequestConverter.parse_openai_request(request_data)
model = ModelMapper.get_warp_model(openai_request.model)
messages = openai_request.messages
logger.info(f"🎯 开始处理聊天请求 [ID: {request_id[:8]}] [模型: {model}] [流式: {stream}]")
start_time = time.time()
try:
# 创建protobuf数据
protobuf_data = self.warp_client.create_protobuf_data(messages, model)
if not protobuf_data:
error_msg = "创建请求数据失败"
logger.error(f"❌ {error_msg} [ID: {request_id[:8]}]")
yield self._create_error_response(error_msg, request_id)
return
# 发送请求并处理响应
response_chunks = 0
total_content = ""
logger.success(f"🚀 开始接收响应 [ID: {request_id[:8]}]")
for chunk_text in self.warp_client.send_request(protobuf_data):
if chunk_text:
response_chunks += 1
total_content += chunk_text
logger.debug(f"📦 响应块 #{response_chunks} [ID: {request_id[:8]}] [长度: {len(chunk_text)}]")
if stream:
# 流式响应
chunk_response = self._create_stream_chunk(chunk_text, request_id)
yield f"data: {json.dumps(chunk_response)}\n\n"
else:
# 非流式响应 - 等待完整内容
continue
# 处理响应结束
end_time = time.time()
duration = end_time - start_time
if stream:
# 发送结束标记
final_chunk = self._create_stream_end_chunk(request_id)
yield f"data: {json.dumps(final_chunk)}\n\n"
yield "data: [DONE]\n\n"
logger.success(f"✅ 流式响应完成 [ID: {request_id[:8]}] [块数: {response_chunks}] [耗时: {duration:.2f}s]")
else:
# 返回完整响应
response = self._create_complete_response(total_content, request_id)
yield response
logger.success(f"✅ 完整响应完成 [ID: {request_id[:8]}] [长度: {len(total_content)}] [耗时: {duration:.2f}s]")
except Exception as e:
logger.error(f"❌ 聊天请求处理失败 [ID: {request_id[:8]}]: {e}")
yield self._create_error_response(f"服务器内部错误: {str(e)}", request_id)
def _create_stream_chunk(self, content: str, request_id: str) -> Dict[str, Any]:
"""创建流式响应块"""
return {
"id": f"chatcmpl-{request_id}",
"object": "chat.completion.chunk",
"created": int(time.time()),
"model": "gemini-2.0-flash",
"choices": [{
"index": 0,
"delta": {"content": content},
"finish_reason": None
}]
}
def _create_stream_end_chunk(self, request_id: str) -> Dict[str, Any]:
"""创建流式响应结束块"""
return {
"id": f"chatcmpl-{request_id}",
"object": "chat.completion.chunk",
"created": int(time.time()),
"model": "gemini-2.0-flash",
"choices": [{
"index": 0,
"delta": {},
"finish_reason": "stop"
}]
}
def _create_complete_response(self, content: str, request_id: str) -> Dict[str, Any]:
"""创建完整响应"""
return {
"id": f"chatcmpl-{request_id}",
"object": "chat.completion",
"created": int(time.time()),
"model": "gemini-2.0-flash",
"choices": [{
"index": 0,
"message": {
"role": "assistant",
"content": content
},
"finish_reason": "stop"
}],
"usage": {
"prompt_tokens": 0,
"completion_tokens": 0,
"total_tokens": 0
}
}
def _create_error_response(self, error_message: str, request_id: str) -> Dict[str, Any]:
"""创建错误响应"""
return {
"error": {
"message": error_message,
"type": "api_error",
"code": "internal_error"
},
"id": request_id
}
def get_token_status(self) -> Dict[str, Any]:
"""获取Token状态"""
try:
status = self.token_manager.get_token_status()
return {"success": True, **status}
except Exception as e:
logger.error(f"❌ 获取Token状态失败: {e}")
return {"success": False, "message": str(e)}
def add_tokens(self, tokens: List[str]) -> Dict[str, Any]:
"""添加Token"""
try:
success = self.token_manager.add_refresh_tokens(tokens)
if success:
valid_tokens = [t for t in tokens if Utils.validate_refresh_token_format(t)]
return {
"success": True,
"message": "Token添加成功",
"added_tokens": len(valid_tokens)
}
else:
return {"success": False, "message": "没有有效的Token可添加"}
except Exception as e:
logger.error(f"❌ 添加Token失败: {e}")
return {"success": False, "message": str(e)}
def remove_refresh_token(self, refresh_token: str) -> Dict[str, Any]:
"""删除refresh token"""
try:
success = self.token_manager.remove_refresh_token(refresh_token)
if success:
return {"success": True, "message": "Token删除成功"}
else:
return {"success": False, "message": "Token不存在"}
except Exception as e:
logger.error(f"❌ 删除Token失败: {e}")
return {"success": False, "message": str(e)}
def refresh_all_tokens(self) -> Dict[str, Any]:
"""刷新所有Token"""
try:
self.token_manager.refresh_all_tokens()
return {"success": True, "message": "Token刷新已开始"}
except Exception as e:
logger.error(f"❌ 刷新Token失败: {e}")
return {"success": False, "message": str(e)}
def export_refresh_tokens(self, super_admin_key: str) -> Dict[str, Any]:
"""导出refresh token内容(需要超级管理员密钥验证)"""
try:
# 验证超级管理员密钥
if Config.require_super_admin_auth():
if not super_admin_key or super_admin_key != Config.get_super_admin_key():
return {"success": False, "message": "超级管理员密钥验证失败"}
# 获取所有refresh token
with self.token_manager.token_lock:
refresh_tokens = list(self.token_manager.tokens.keys())
if not refresh_tokens:
return {"success": False, "message": "没有可导出的token"}
# 创建分号分割的token字符串
token_string = ";".join(refresh_tokens)
# 生成建议的文件名(带时间戳)
timestamp = datetime.now().strftime("%Y%m%d_%H%M%S")
suggested_filename = f"refresh_tokens_export_{timestamp}.txt"
logger.info(f"🔒 超级管理员请求导出 {len(refresh_tokens)} 个refresh token")
return {
"success": True,
"message": f"准备导出 {len(refresh_tokens)} 个token",
"content": token_string,
"suggested_filename": suggested_filename,
"token_count": len(refresh_tokens)
}
except Exception as e:
logger.error(f"❌ 准备导出refresh token失败: {e}")
return {"success": False, "message": f"导出失败: {str(e)}"}
def batch_get_refresh_tokens(self, email_url_dict: Dict[str, str], max_workers: int = 5) -> Dict[str, Any]:
"""批量获取refresh token并自动创建用户"""
try:
from login_client import LoginClient
login_client = LoginClient()
# 传递token_manager参数,这样在获取refresh_token后会立即尝试创建用户
results = login_client.batch_process_emails(email_url_dict, max_workers, self.token_manager)
# 提取有效的token并添加到管理器
valid_tokens = []
for email, result in results.items():
if result.get('refresh_token'):
valid_tokens.append(result['refresh_token'])
if valid_tokens:
self.token_manager.add_refresh_tokens(valid_tokens)
logger.info(f"✅ 批量获取并添加了 {len(valid_tokens)} 个有效token")
return {
'success': True,
'results': results,
'total_count': len(email_url_dict),
'success_count': len(valid_tokens)
}
except Exception as e:
logger.error(f"❌ 批量获取refresh token失败: {e}")
return {'success': False, 'message': str(e)}
def start_services(self):
"""启动后台服务"""
try:
self.token_manager.start_auto_refresh()
logger.success("✅ 后台服务启动成功")
except Exception as e:
logger.error(f"❌ 启动后台服务失败: {e}")
def stop_services(self):
"""停止后台服务"""
try:
self.token_manager.stop_auto_refresh()
logger.info("⏹️ 后台服务已停止")
except Exception as e:
logger.error(f"❌ 停止后台服务失败: {e}")