#!/usr/bin/env python3 """ Warp API 服务器 整合所有模块,提供OpenAI兼容的API接口和Token管理Web界面 """ import json import threading from flask import Flask, request, jsonify, Response, stream_with_context, render_template_string, session, redirect, url_for from loguru import logger from config import Config from utils import Utils from api_service import ApiService # 初始化日志 Utils.setup_logging() # 创建Flask应用 app = Flask(__name__) app.secret_key = 'warp-api-server-secret-key-2024' # 用于session管理 # 创建API服务 api_service = ApiService() def check_admin_auth(): """检查管理员认证""" if not Config.require_admin_auth(): return True # 检查session中的管理员认证状态 return session.get('admin_authenticated', False) def verify_admin_key(admin_key: str) -> bool: """验证管理员密钥""" if not Config.require_admin_auth(): return True return admin_key == Config.get_admin_key() @app.before_request def check_auth(): """请求前鉴权检查""" # 跳过管理员认证的端点 admin_auth_endpoints = ['admin_login', 'admin_auth'] if request.endpoint in admin_auth_endpoints: return # 管理界面相关端点需要管理员认证 admin_endpoints = ['index', 'token_status', 'add_tokens', 'batch_login', 'refresh_tokens', 'remove_tokens', 'export_tokens'] if request.endpoint in admin_endpoints: if not check_admin_auth(): if request.endpoint == 'index': return redirect(url_for('admin_login')) else: return jsonify({"error": "需要管理员认证"}), 401 # API端点需要API密钥认证 api_endpoints = ['get_models', 'chat_completions'] if request.endpoint in api_endpoints: auth_header = request.headers.get('Authorization') if not api_service.authenticate_request(auth_header): return jsonify({"error": "未授权访问"}), 401 @app.route('/admin/login') def admin_login(): """管理员登录页面""" if not Config.require_admin_auth(): return redirect(url_for('index')) if check_admin_auth(): return redirect(url_for('index')) from web_template import get_admin_login_template return render_template_string(get_admin_login_template()) @app.route('/admin/auth', methods=['POST']) def admin_auth(): """管理员认证处理""" if not Config.require_admin_auth(): return jsonify({"success": True, "redirect": "/"}) try: data = request.get_json() admin_key = data.get('admin_key', '') if verify_admin_key(admin_key): session['admin_authenticated'] = True logger.info("🔐 管理员认证成功") return jsonify({"success": True, "message": "认证成功", "redirect": "/"}) else: logger.warning("⚠️ 管理员认证失败") return jsonify({"success": False, "message": "管理员密钥错误"}) except Exception as e: logger.error(f"❌ 管理员认证出错: {e}") return jsonify({"success": False, "message": "认证失败"}), 500 @app.route('/admin/logout', methods=['POST']) def admin_logout(): """管理员登出""" session.pop('admin_authenticated', None) return jsonify({"success": True, "message": "已登出"}) @app.route('/') def index(): """管理页面""" from web_template import get_html_template return render_template_string(get_html_template()) @app.route(f"/{Config.OPENAI_API_VERSION}/models", methods=['GET']) def get_models(): """获取模型列表""" return jsonify(api_service.get_models()) @app.route(f"/{Config.OPENAI_API_VERSION}/chat/completions", methods=['POST']) def chat_completions(): """聊天完成端点""" try: request_data = request.get_json() if not request_data: return jsonify({"error": "无效的请求数据"}), 400 # 检查模型是否有效 model = request_data.get("model") from model_mapper import ModelMapper if not ModelMapper.is_valid_model(model): return jsonify({"error": f"不支持的模型: {model}"}), 400 stream = request_data.get("stream", False) if stream: # 流式响应 def generate(): for chunk in api_service.chat_completion(request_data, stream=True): yield chunk return Response( stream_with_context(generate()), content_type='text/event-stream', headers={ 'Cache-Control': 'no-cache', 'Connection': 'keep-alive', 'Access-Control-Allow-Origin': '*' } ) else: # 非流式响应 response_text = "" for chunk in api_service.chat_completion(request_data, stream=False): response_text = chunk break return Response( json.dumps(response_text) if isinstance(response_text, dict) else response_text, content_type='application/json' ) except Exception as e: logger.error(f"❌ 处理聊天请求时出错: {e}") return jsonify({"error": f"服务器内部错误: {str(e)}"}), 500 @app.route('/token/status', methods=['GET']) def token_status(): """获取token状态""" try: status = api_service.get_token_status() return jsonify(status) except Exception as e: return jsonify({"success": False, "message": str(e)}), 500 @app.route('/token/add', methods=['POST']) def add_tokens(): """添加token""" try: data = request.get_json() tokens = data.get('tokens', []) result = api_service.add_tokens(tokens) return jsonify(result) except Exception as e: return jsonify({"success": False, "message": str(e)}), 500 @app.route('/token/remove', methods=['POST']) def remove_tokens(): """删除token""" try: data = request.get_json() refresh_token = data.get('refresh_token') result = api_service.remove_refresh_token(refresh_token=refresh_token) return jsonify(result) except Exception as e: return jsonify({"success": False, "message": str(e)}), 500 @app.route('/token/refresh', methods=['POST']) def refresh_tokens(): """刷新所有token""" try: result = api_service.refresh_all_tokens() return jsonify(result) except Exception as e: return jsonify({"success": False, "message": str(e)}), 500 @app.route('/token/export', methods=['POST']) def export_tokens(): """导出refresh token(需要超级管理员密钥验证)""" try: data = request.get_json() super_admin_key = data.get('super_admin_key', '') result = api_service.export_refresh_tokens(super_admin_key) return jsonify(result) except Exception as e: logger.error(f"❌ 导出token时出错: {e}") return jsonify({"success": False, "message": str(e)}), 500 @app.route('/login/batch', methods=['POST']) def batch_login(): """批量获取refresh token""" try: data = request.get_json() email_url_pairs = data.get('email_url_pairs', []) max_workers = data.get('max_workers', 5) # 转换为字典格式 email_url_dict = {} for pair in email_url_pairs: email = pair.get('email', '').strip() url = pair.get('url', '').strip() if email and url: email_url_dict[email] = url if not email_url_dict: return jsonify({'success': False, 'message': '没有有效的邮箱和URL对'}), 400 result = api_service.batch_get_refresh_tokens(email_url_dict, max_workers) return jsonify(result) except Exception as e: logger.error(f"❌ 批量登录时出错: {e}") return jsonify({'success': False, 'message': f'服务器错误: {str(e)}'}), 500 @app.route('/health', methods=['GET']) def health_check(): """健康检查端点""" return jsonify({ "status": "healthy", "timestamp": Utils.get_current_timestamp(), "version": "2.0.0" }) def compile_protobuf_at_startup(): """启动时编译protobuf""" logger.info("🔧 开始编译Protobuf文件...") try: from protobuf_manager import ProtobufManager success = ProtobufManager.validate_protobuf_module() if success: logger.success("✅ Protobuf模块验证成功") else: logger.warning("⚠️ Protobuf模块验证失败,但服务仍将启动") except Exception as e: logger.error(f"❌ 编译Protobuf时出错: {e}") def main(): """主函数""" config = Config() print("=" * 80) print("🚀 启动 Warp API 服务器 v2.0") print("=" * 80) print(f"📡 服务器地址: http://localhost:{Config.SERVER_PORT}") print(f"🔧 管理界面: http://localhost:{Config.SERVER_PORT}") print(f"📚 模型端点: http://localhost:{Config.SERVER_PORT}/{Config.OPENAI_API_VERSION}/models") print(f"💬 聊天端点: http://localhost:{Config.SERVER_PORT}/{Config.OPENAI_API_VERSION}/chat/completions") print(f"❤️ 健康检查: http://localhost:{Config.SERVER_PORT}/health") print(f"📝 文件日志: {'启用' if Config.enable_file_logging() else '禁用'}") # 显示安全配置信息 if Config.require_admin_auth(): print(f"🔐 管理员认证: 已启用 (需要 ADMIN_KEY)") else: print("⚠️ 管理员认证: 未启用 (可通过 ADMIN_KEY 环境变量启用)") if Config.require_super_admin_auth(): print(f"🔒 超级管理员认证: 已启用 (导出功能需要 SUPER_ADMIN_KEY)") else: print("⚠️ 超级管理员认证: 未启用 (可通过 SUPER_ADMIN_KEY 环境变量启用)") # 显示环境变量token信息 env_tokens = Config.get_refresh_tokens() if env_tokens: print(f"🎯 环境变量Token: 已设置 {len(env_tokens)} 个") for i, token in enumerate(env_tokens, 1): print(f" #{i}: {token[:20]}...{token[-8:] if len(token) > 28 else ''}") else: print("⚠️ 环境变量Token: 未设置 (请设置 WARP_REFRESH_TOKEN)") print("💡 支持多个token,使用分号(;)分割:token1;token2;token3") print("-" * 80) try: # 编译protobuf compile_protobuf_at_startup() # 启动后台服务 logger.info("🔄 启动后台服务...") api_service.start_services() logger.success("✅ 后台服务启动完成") logger.info("-" * 80) logger.success("🌟 服务器启动完成!") if Config.require_admin_auth(): logger.info(f"🔐 访问管理界面需要管理员密钥: http://localhost:{Config.SERVER_PORT}") else: logger.info(f"🌐 访问管理界面: http://localhost:{Config.SERVER_PORT}") logger.info("-" * 80) # 启动Flask服务器 app.run( host=Config.SERVER_HOST, port=Config.SERVER_PORT, debug=False, threaded=True ) except KeyboardInterrupt: logger.info("🛑 接收到停止信号") except Exception as e: logger.error(f"❌ 服务器启动失败: {e}") finally: logger.info("🧹 清理资源...") api_service.stop_services() logger.info("👋 服务器已停止") if __name__ == "__main__": main()