import os import json import uuid import time import base64 import sys import inspect import secrets import requests from flask import Flask, request, Response, jsonify, stream_with_context, render_template, redirect, session from curl_cffi import requests as curl_requests from werkzeug.middleware.proxy_fix import ProxyFix from src.config import CONFIG, DEFAULT_HEADERS # 导入配置项和默认请求头 from src.core.logger import logger # 从新的位置导入 logger from src.core.auth_token_manager import AuthTokenManager # 从新的位置导入 AuthTokenManager from src.core.utils import Utils # 从新的位置导入 Utils from src.core.grok_api_client import GrokApiClient # 从新的位置导入 GrokApiClient from src.core.message_processor import MessageProcessor # 从新的位置导入 MessageProcessor def initialization(): sso_array = os.getenv("SSO", "").split(',') logger.info("开始加载令牌", "Server") for sso in sso_array: if sso: token_manager.add_token(f"sso-rw={sso};sso={sso}") logger.info(f"成功加载令牌: {json.dumps(token_manager.get_all_tokens(), indent=2)}", "Server") logger.info(f"令牌加载完成,共加载: {len(token_manager.get_all_tokens())}个令牌", "Server") if CONFIG["API"]["PROXY"]: logger.info(f"代理已设置: {CONFIG['API']['PROXY']}", "Server") logger.info("初始化完成", "Server") # 显式指定模板和静态文件目录,确保在生产环境中能找到 app = Flask(__name__, template_folder='./templates', static_folder='./static') app.wsgi_app = ProxyFix(app.wsgi_app) app.secret_key = os.getenv('FLASK_SECRET_KEY') or secrets.token_hex(16) app.json.sort_keys = False @app.route('/manager/login', methods=['GET', 'POST']) def manager_login(): if CONFIG["ADMIN"]["MANAGER_SWITCH"]: if request.method == 'POST': password = request.form.get('password') if password == CONFIG["ADMIN"]["PASSWORD"]: session['is_logged_in'] = True return redirect('/manager') return render_template('login.html', error=True) return render_template('login.html', error=False) else: return redirect('/') def check_auth(): return session.get('is_logged_in', False) @app.route('/manager') def manager(): if not check_auth(): return redirect('/manager/login') return render_template('manager.html') @app.route('/manager/api/get') def get_manager_tokens(): if not check_auth(): return jsonify({"error": "Unauthorized"}), 401 return jsonify(token_manager.get_token_status_map()) @app.route('/manager/api/add', methods=['POST']) def add_manager_token(): if not check_auth(): return jsonify({"error": "Unauthorized"}), 401 try: sso = request.json.get('sso') if not sso: return jsonify({"error": "SSO token is required"}), 400 token_manager.add_token(f"sso-rw={sso};sso={sso}") return jsonify({"success": True}) except Exception as e: return jsonify({"error": str(e)}), 500 @app.route('/manager/api/delete', methods=['POST']) def delete_manager_token(): if not check_auth(): return jsonify({"error": "Unauthorized"}), 401 try: sso = request.json.get('sso') if not sso: return jsonify({"error": "SSO token is required"}), 400 token_manager.delete_token(f"sso-rw={sso};sso={sso}") return jsonify({"success": True}) except Exception as e: return jsonify({"error": str(e)}), 500 @app.route('/manager/api/cf_clearance', methods=['POST']) def setCf_Manager_clearance(): if not check_auth(): return jsonify({"error": "Unauthorized"}), 401 try: cf_clearance = request.json.get('cf_clearance') if not cf_clearance: return jsonify({"error": "cf_clearance is required"}), 400 CONFIG["SERVER"]['CF_CLEARANCE'] = cf_clearance return jsonify({"success": True}) except Exception as e: return jsonify({"error": str(e)}), 500 @app.route('/get/tokens', methods=['GET']) def get_tokens(): auth_token = request.headers.get('Authorization', '').replace('Bearer ', '') if CONFIG["API"]["IS_CUSTOM_SSO"]: return jsonify({"error": '自定义的SSO令牌模式无法获取轮询sso令牌状态'}), 403 elif auth_token != CONFIG["API"]["API_KEY"]: return jsonify({"error": 'Unauthorized'}), 401 return jsonify(token_manager.get_token_status_map()) @app.route('/add/token', methods=['POST']) def add_token(): auth_token = request.headers.get('Authorization', '').replace('Bearer ', '') if CONFIG["API"]["IS_CUSTOM_SSO"]: return jsonify({"error": '自定义的SSO令牌模式无法添加sso令牌'}), 403 elif auth_token != CONFIG["API"]["API_KEY"]: return jsonify({"error": 'Unauthorized'}), 401 try: sso = request.json.get('sso') token_manager.add_token(f"sso-rw={sso};sso={sso}") return jsonify(token_manager.get_token_status_map().get(sso, {})), 200 except Exception as error: logger.error(str(error), "Server") return jsonify({"error": '添加sso令牌失败'}), 500 @app.route('/set/cf_clearance', methods=['POST']) def setCf_clearance(): auth_token = request.headers.get('Authorization', '').replace('Bearer ', '') if auth_token != CONFIG["API"]["API_KEY"]: return jsonify({"error": 'Unauthorized'}), 401 try: cf_clearance = request.json.get('cf_clearance') CONFIG["SERVER"]['CF_CLEARANCE'] = cf_clearance return jsonify({"message": '设置cf_clearance成功'}), 200 except Exception as error: logger.error(str(error), "Server") return jsonify({"error": '设置cf_clearance失败'}), 500 @app.route('/delete/token', methods=['POST']) def delete_token(): auth_token = request.headers.get('Authorization', '').replace('Bearer ', '') if CONFIG["API"]["IS_CUSTOM_SSO"]: return jsonify({"error": '自定义的SSO令牌模式无法删除sso令牌'}), 403 elif auth_token != CONFIG["API"]["API_KEY"]: return jsonify({"error": 'Unauthorized'}), 401 try: sso = request.json.get('sso') token_manager.delete_token(f"sso-rw={sso};sso={sso}") return jsonify({"message": '删除sso令牌成功'}), 200 except Exception as error: logger.error(str(error), "Server") return jsonify({"error": '删除sso令牌失败'}), 500 @app.route('/v1/models', methods=['GET']) def get_models(): return jsonify({ "object": "list", "data": [ { "id": model, "object": "model", "created": int(time.time()), "owned_by": "grok" } for model in CONFIG["MODELS"].keys() ] }) @app.route('/v1/chat/completions', methods=['POST']) def chat_completions(): response_status_code = 500 try: auth_token = request.headers.get('Authorization', '').replace('Bearer ', '') if auth_token: if CONFIG["API"]["IS_CUSTOM_SSO"]: result = f"sso={auth_token};sso-rw={auth_token}" token_manager.set_token(result) elif auth_token != CONFIG["API"]["API_KEY"]: return jsonify({"error": 'Unauthorized'}), 401 else: return jsonify({"error": 'API_KEY缺失'}), 401 data = request.json model = data.get("model") stream = data.get("stream", False) retry_count = 0 grok_client = GrokApiClient(model) request_payload = grok_client.prepare_chat_request(data) while retry_count < CONFIG["RETRY"]["MAX_ATTEMPTS"]: retry_count += 1 # 传递 token_manager 实例以解决循环导入 CONFIG["API"]["SIGNATURE_COOKIE"] = Utils.create_auth_headers(token_manager, model) if not CONFIG["API"]["SIGNATURE_COOKIE"]: raise ValueError('该模型无可用令牌') logger.info( f"当前令牌: {json.dumps(CONFIG['API']['SIGNATURE_COOKIE'], indent=2)}","Server") logger.info( f"当前可用模型的全部可用数量: {json.dumps(token_manager.get_remaining_token_request_capacity(), indent=2)}","Server") if CONFIG['SERVER']['CF_CLEARANCE']: CONFIG["SERVER"]['COOKIE'] = f"{CONFIG['API']['SIGNATURE_COOKIE']};{CONFIG['SERVER']['CF_CLEARANCE']}" else: CONFIG["SERVER"]['COOKIE'] = CONFIG['API']['SIGNATURE_COOKIE'] logger.info(json.dumps(request_payload,indent=2),"Server") try: proxy_options = Utils.get_proxy_options() response = curl_requests.post( f"{CONFIG['API']['BASE_URL']}/rest/app-chat/conversations/new", headers={ **DEFAULT_HEADERS, "Cookie":CONFIG["SERVER"]['COOKIE'] }, json=request_payload, impersonate="chrome133a", verify=False, stream=True, **proxy_options) logger.info(CONFIG["SERVER"]['COOKIE'],"Server") if response.status_code == 200: response_status_code = 200 logger.info("请求成功", "Server") logger.info( f"当前{model}剩余可用令牌数: {token_manager.get_token_count_for_model(model)}","Server") try: if stream: return Response(stream_with_context( MessageProcessor.handle_stream_response(response, model)), content_type='text/event-stream') else: content = MessageProcessor.handle_non_stream_response(response, model) return jsonify( MessageProcessor.create_chat_response(content, model)) except Exception as error: logger.error(str(error), "Server") if CONFIG["API"]["IS_CUSTOM_SSO"]: raise ValueError(f"自定义SSO令牌当前模型{model}的请求次数已失效") token_manager.remove_token_from_model(model, CONFIG["API"]["SIGNATURE_COOKIE"]) if token_manager.get_token_count_for_model(model) == 0: raise ValueError(f"{model} 次数已达上限,请切换其他模型或者重新对话") elif response.status_code == 403: response_status_code = 403 token_manager.reduce_token_request_count(model,1)#重置去除当前因为错误未成功请求的次数,确保不会因为错误未成功请求的次数导致次数上限 if token_manager.get_token_count_for_model(model) == 0: raise ValueError(f"{model} 次数已达上限,请切换其他模型或者重新对话") raise ValueError(f"IP暂时被封无法破盾,请稍后重试或者更换ip") elif response.status_code == 429: response_status_code = 429 token_manager.reduce_token_request_count(model,1) if CONFIG["API"]["IS_CUSTOM_SSO"]: raise ValueError(f"自定义SSO令牌当前模型{model}的请求次数已失效") token_manager.remove_token_from_model( model, CONFIG["API"]["SIGNATURE_COOKIE"]) if token_manager.get_token_count_for_model(model) == 0: raise ValueError(f"{model} 次数已达上限,请切换其他模型或者重新对话") else: if CONFIG["API"]["IS_CUSTOM_SSO"]: raise ValueError(f"自定义SSO令牌当前模型{model}的请求次数已失效") logger.error(f"令牌异常错误状态!status: {response.status_code}","Server") token_manager.remove_token_from_model(model, CONFIG["API"]["SIGNATURE_COOKIE"]) logger.info( f"当前{model}剩余可用令牌数: {token_manager.get_token_count_for_model(model)}", "Server") except Exception as e: logger.error(f"请求处理异常: {str(e)}", "Server") if CONFIG["API"]["IS_CUSTOM_SSO"]: raise continue if response_status_code == 403: raise ValueError('IP暂时被封无法破盾,请稍后重试或者更换ip') elif response_status_code == 500: raise ValueError('当前模型所有令牌暂无可用,请稍后重试') except Exception as error: logger.error(str(error), "ChatAPI") return jsonify( {"error": { "message": str(error), "type": "server_error" }}), response_status_code @app.route('/') def index(): """渲染系统状态展示页面""" return render_template('index.html') @app.route('/status') def get_system_status(): """提供系统状态的 JSON 数据""" logger.info("访问 /status 路由", "StatusAPI") # 添加日志 try: # 获取配置信息 (需要根据实际CONFIG结构调整) config_info = { "api_base_url": CONFIG["API"]["BASE_URL"], "log_level": logger.level, # 使用logger.level获取日志级别 "is_temp_conversation": CONFIG["API"]["IS_TEMP_CONVERSATION"], "is_custom_sso": CONFIG["API"]["IS_CUSTOM_SSO"], "proxy": CONFIG["API"]["PROXY"], "manager_switch": CONFIG["ADMIN"]["MANAGER_SWITCH"], "show_thinking": CONFIG["SHOW_THINKING"], "isshow_search_results": CONFIG["ISSHOW_SEARCH_RESULTS"] } # 获取令牌状态 token_status = { "total_tokens": token_manager.get_total_token_count(), "available_tokens": token_manager.get_remaining_token_request_capacity(), "total_request_count": token_manager.get_total_request_count(), "expired_tokens_count": len(token_manager.get_expired_tokens()), "token_details": token_manager.get_token_status_map() # 提供更详细的令牌状态 } # 获取日志摘要 log_summary = logger.get_recent_logs() response_data = { "config": config_info, "token_status": token_status, "log_summary": log_summary } logger.info(f"/status 路由返回数据: {json.dumps(response_data, indent=2)}", "StatusAPI") # 添加日志 return jsonify(response_data) except Exception as e: logger.error(f"访问 /status 路由时发生错误: {str(e)}", "StatusAPI") # 添加错误日志 return jsonify({"error": "无法获取系统状态"}), 500 token_manager = AuthTokenManager() # 创建 AuthTokenManager 实例 initialization() if __name__ == '__main__': app.run( host='0.0.0.0', port=CONFIG["SERVER"]["PORT"], debug=False )