test-w / warp_api_server.py
letterm's picture
Upload 13 files
47d8ca8 verified
raw
history blame
12.3 kB
#!/usr/bin/env python3
"""
Warp API 服务器
整合所有模块,提供OpenAI兼容的API接口和Token管理Web界面
"""
import json
import threading
from flask import Flask, request, jsonify, Response, stream_with_context, render_template_string, session, redirect, url_for
from loguru import logger
from config import Config
from utils import Utils
from api_service import ApiService
# 初始化日志
Utils.setup_logging()
# 创建Flask应用
app = Flask(__name__)
app.secret_key = 'warp-api-server-secret-key-2024' # 用于session管理
# 创建API服务
api_service = ApiService()
def check_admin_auth():
"""检查管理员认证"""
if not Config.require_admin_auth():
return True
# 检查session中的管理员认证状态
return session.get('admin_authenticated', False)
def verify_admin_key(admin_key: str) -> bool:
"""验证管理员密钥"""
if not Config.require_admin_auth():
return True
return admin_key == Config.get_admin_key()
@app.before_request
def check_auth():
"""请求前鉴权检查"""
# 跳过管理员认证的端点
admin_auth_endpoints = ['admin_login', 'admin_auth']
if request.endpoint in admin_auth_endpoints:
return
# 管理界面相关端点需要管理员认证
admin_endpoints = ['index', 'token_status', 'add_tokens', 'batch_login', 'refresh_tokens', 'remove_tokens', 'export_tokens']
if request.endpoint in admin_endpoints:
if not check_admin_auth():
if request.endpoint == 'index':
return redirect(url_for('admin_login'))
else:
return jsonify({"error": "需要管理员认证"}), 401
# API端点需要API密钥认证
api_endpoints = ['get_models', 'chat_completions']
if request.endpoint in api_endpoints:
auth_header = request.headers.get('Authorization')
if not api_service.authenticate_request(auth_header):
return jsonify({"error": "未授权访问"}), 401
@app.route('/admin/login')
def admin_login():
"""管理员登录页面"""
if not Config.require_admin_auth():
return redirect(url_for('index'))
if check_admin_auth():
return redirect(url_for('index'))
from web_template import get_admin_login_template
return render_template_string(get_admin_login_template())
@app.route('/admin/auth', methods=['POST'])
def admin_auth():
"""管理员认证处理"""
if not Config.require_admin_auth():
return jsonify({"success": True, "redirect": "/"})
try:
data = request.get_json()
admin_key = data.get('admin_key', '')
if verify_admin_key(admin_key):
session['admin_authenticated'] = True
logger.info("🔐 管理员认证成功")
return jsonify({"success": True, "message": "认证成功", "redirect": "/"})
else:
logger.warning("⚠️ 管理员认证失败")
return jsonify({"success": False, "message": "管理员密钥错误"})
except Exception as e:
logger.error(f"❌ 管理员认证出错: {e}")
return jsonify({"success": False, "message": "认证失败"}), 500
@app.route('/admin/logout', methods=['POST'])
def admin_logout():
"""管理员登出"""
session.pop('admin_authenticated', None)
return jsonify({"success": True, "message": "已登出"})
@app.route('/')
def index():
"""管理页面"""
from web_template import get_html_template
return render_template_string(get_html_template())
@app.route(f"/{Config.OPENAI_API_VERSION}/models", methods=['GET'])
def get_models():
"""获取模型列表"""
return jsonify(api_service.get_models())
@app.route(f"/{Config.OPENAI_API_VERSION}/chat/completions", methods=['POST'])
def chat_completions():
"""聊天完成端点"""
try:
request_data = request.get_json()
if not request_data:
return jsonify({"error": "无效的请求数据"}), 400
# 检查模型是否有效
model = request_data.get("model")
from model_mapper import ModelMapper
if not ModelMapper.is_valid_model(model):
return jsonify({"error": f"不支持的模型: {model}"}), 400
stream = request_data.get("stream", False)
if stream:
# 流式响应
def generate():
for chunk in api_service.chat_completion(request_data, stream=True):
yield chunk
return Response(
stream_with_context(generate()),
content_type='text/event-stream',
headers={
'Cache-Control': 'no-cache',
'Connection': 'keep-alive',
'Access-Control-Allow-Origin': '*'
}
)
else:
# 非流式响应
response_text = ""
for chunk in api_service.chat_completion(request_data, stream=False):
response_text = chunk
break
return Response(
json.dumps(response_text) if isinstance(response_text, dict) else response_text,
content_type='application/json'
)
except Exception as e:
logger.error(f"❌ 处理聊天请求时出错: {e}")
return jsonify({"error": f"服务器内部错误: {str(e)}"}), 500
@app.route('/token/status', methods=['GET'])
def token_status():
"""获取token状态"""
try:
status = api_service.get_token_status()
return jsonify(status)
except Exception as e:
return jsonify({"success": False, "message": str(e)}), 500
@app.route('/token/add', methods=['POST'])
def add_tokens():
"""添加token"""
try:
data = request.get_json()
tokens = data.get('tokens', [])
result = api_service.add_tokens(tokens)
return jsonify(result)
except Exception as e:
return jsonify({"success": False, "message": str(e)}), 500
@app.route('/token/remove', methods=['POST'])
def remove_tokens():
"""删除token"""
try:
data = request.get_json()
refresh_token = data.get('refresh_token')
result = api_service.remove_refresh_token(refresh_token=refresh_token)
return jsonify(result)
except Exception as e:
return jsonify({"success": False, "message": str(e)}), 500
@app.route('/token/refresh', methods=['POST'])
def refresh_tokens():
"""刷新所有token"""
try:
result = api_service.refresh_all_tokens()
return jsonify(result)
except Exception as e:
return jsonify({"success": False, "message": str(e)}), 500
@app.route('/token/export', methods=['POST'])
def export_tokens():
"""导出refresh token(需要超级管理员密钥验证)"""
try:
data = request.get_json()
super_admin_key = data.get('super_admin_key', '')
result = api_service.export_refresh_tokens(super_admin_key)
return jsonify(result)
except Exception as e:
logger.error(f"❌ 导出token时出错: {e}")
return jsonify({"success": False, "message": str(e)}), 500
@app.route('/login/batch', methods=['POST'])
def batch_login():
"""批量获取refresh token"""
try:
data = request.get_json()
email_url_pairs = data.get('email_url_pairs', [])
max_workers = data.get('max_workers', 5)
# 转换为字典格式
email_url_dict = {}
for pair in email_url_pairs:
email = pair.get('email', '').strip()
url = pair.get('url', '').strip()
if email and url:
email_url_dict[email] = url
if not email_url_dict:
return jsonify({'success': False, 'message': '没有有效的邮箱和URL对'}), 400
result = api_service.batch_get_refresh_tokens(email_url_dict, max_workers)
return jsonify(result)
except Exception as e:
logger.error(f"❌ 批量登录时出错: {e}")
return jsonify({'success': False, 'message': f'服务器错误: {str(e)}'}), 500
@app.route('/health', methods=['GET'])
def health_check():
"""健康检查端点"""
return jsonify({
"status": "healthy",
"timestamp": Utils.get_current_timestamp(),
"version": "2.0.0"
})
def compile_protobuf_at_startup():
"""启动时编译protobuf"""
logger.info("🔧 开始编译Protobuf文件...")
try:
from protobuf_manager import ProtobufManager
success = ProtobufManager.validate_protobuf_module()
if success:
logger.success("✅ Protobuf模块验证成功")
else:
logger.warning("⚠️ Protobuf模块验证失败,但服务仍将启动")
except Exception as e:
logger.error(f"❌ 编译Protobuf时出错: {e}")
def main():
"""主函数"""
config = Config()
print("=" * 80)
print("🚀 启动 Warp API 服务器 v2.0")
print("=" * 80)
print(f"📡 服务器地址: http://localhost:{Config.SERVER_PORT}")
print(f"🔧 管理界面: http://localhost:{Config.SERVER_PORT}")
print(f"📚 模型端点: http://localhost:{Config.SERVER_PORT}/{Config.OPENAI_API_VERSION}/models")
print(f"💬 聊天端点: http://localhost:{Config.SERVER_PORT}/{Config.OPENAI_API_VERSION}/chat/completions")
print(f"❤️ 健康检查: http://localhost:{Config.SERVER_PORT}/health")
print(f"📝 文件日志: {'启用' if Config.enable_file_logging() else '禁用'}")
# 显示安全配置信息
if Config.require_admin_auth():
print(f"🔐 管理员认证: 已启用 (需要 ADMIN_KEY)")
else:
print("⚠️ 管理员认证: 未启用 (可通过 ADMIN_KEY 环境变量启用)")
if Config.require_super_admin_auth():
print(f"🔒 超级管理员认证: 已启用 (导出功能需要 SUPER_ADMIN_KEY)")
else:
print("⚠️ 超级管理员认证: 未启用 (可通过 SUPER_ADMIN_KEY 环境变量启用)")
# 显示环境变量token信息
env_tokens = Config.get_refresh_tokens()
if env_tokens:
print(f"🎯 环境变量Token: 已设置 {len(env_tokens)} 个")
for i, token in enumerate(env_tokens, 1):
print(f" #{i}: {token[:20]}...{token[-8:] if len(token) > 28 else ''}")
else:
print("⚠️ 环境变量Token: 未设置 (请设置 WARP_REFRESH_TOKEN)")
print("💡 支持多个token,使用分号(;)分割:token1;token2;token3")
print("-" * 80)
try:
# 编译protobuf
compile_protobuf_at_startup()
# 启动后台服务
logger.info("🔄 启动后台服务...")
api_service.start_services()
logger.success("✅ 后台服务启动完成")
logger.info("-" * 80)
logger.success("🌟 服务器启动完成!")
if Config.require_admin_auth():
logger.info(f"🔐 访问管理界面需要管理员密钥: http://localhost:{Config.SERVER_PORT}")
else:
logger.info(f"🌐 访问管理界面: http://localhost:{Config.SERVER_PORT}")
logger.info("-" * 80)
# 启动Flask服务器
app.run(
host=Config.SERVER_HOST,
port=Config.SERVER_PORT,
debug=False,
threaded=True
)
except KeyboardInterrupt:
logger.info("🛑 接收到停止信号")
except Exception as e:
logger.error(f"❌ 服务器启动失败: {e}")
finally:
logger.info("🧹 清理资源...")
api_service.stop_services()
logger.info("👋 服务器已停止")
if __name__ == "__main__":
main()