#!/usr/bin/env python3 # -*- coding: utf-8 -*- """ Simple Clash Relay - Flask 应用入口 """ import os import logging import socket import threading from flask import Flask, request, jsonify, Response, redirect, send_from_directory, flash from flask_sockets import Sockets # 导入 Sockets from .clash_manager import ClashManager from .sub_manager import SubscriptionManager from .auth import authenticate import requests from functools import wraps from werkzeug.utils import secure_filename import time import gevent # 导入 gevent from gevent import pywsgi from geventwebsocket.handler import WebSocketHandler # 导入 WebSocketHandler # 配置日志 logging.basicConfig( level=logging.INFO, format="%(asctime)s - %(name)s - %(levelname)s - %(message)s" ) logger = logging.getLogger(__name__) # 从环境变量加载配置 SUB_URL = os.environ.get("SUB_URL") API_KEY = os.environ.get("API_KEY", "changeme") FLASK_PORT = int(os.environ.get("FLASK_PORT", 7860)) # 默认端口改为7860 CLASH_PROXY_PORT = int(os.environ.get("CLASH_PROXY_PORT", 7890)) CLASH_API_PORT = int(os.environ.get("CLASH_API_PORT", 9090)) # 添加标记文件路径 MANUAL_CONFIG_MARKER = os.path.join(os.path.dirname(__file__), "data", ".use_manual_config") # 初始化Flask应用 和 Sockets app = Flask(__name__, static_folder='static') app.secret_key = os.environ.get("FLASK_SECRET_KEY", "supersecretkey") # 用于flash消息 sockets = Sockets(app) # 初始化 Sockets # 初始化管理器 clash_manager = None sub_manager = None initialization_error = None @app.before_request def initialize_once(): """应用首次请求前的初始化""" global clash_manager, sub_manager, initialization_error, _initialized # 使用类变量确保只初始化一次 if not getattr(initialize_once, '_initialized', False): logger.info("正在初始化应用 (首次请求)...") # 初始化订阅管理器 sub_manager = SubscriptionManager( sub_url=SUB_URL, config_path=os.path.join(os.path.dirname(os.path.dirname(__file__)), "data", "config.yaml") ) # 加载订阅并转换为Clash配置 try: sub_manager.load_and_convert_sub() logger.info("成功加载并转换订阅") except Exception as e: err_msg = f"加载订阅失败: {str(e)}" logger.error(err_msg) initialization_error = err_msg initialize_once._initialized = True # 标记已初始化,即使失败 return # 初始化Clash管理器 clash_manager = ClashManager( config_path=os.path.join(os.path.dirname(os.path.dirname(__file__)), "data", "config.yaml"), clash_path=os.path.join(os.path.dirname(os.path.dirname(__file__)), "clash_core", "clash.meta-linux-amd64"), api_port=CLASH_API_PORT, proxy_port=CLASH_PROXY_PORT ) # 启动Clash Core try: clash_manager.start_clash() logger.info("成功启动Clash Core") except Exception as e: err_msg = f"启动Clash Core失败: {str(e)}" logger.error(err_msg) initialization_error = err_msg initialize_once._initialized = True # 标记已初始化 @app.route("/api/nodes", methods=["GET"]) @authenticate def get_nodes(): """获取可用节点列表""" global clash_manager, initialization_error if clash_manager is None: return jsonify({ "success": False, "error": f"Clash未启动: {initialization_error or '未知错误'}" }), 503 try: nodes = clash_manager.get_nodes() return jsonify({"success": True, "nodes": nodes}) except Exception as e: logger.error(f"获取节点列表失败: {str(e)}") return jsonify({"success": False, "error": str(e)}), 500 @app.route("/api/switch", methods=["PUT"]) @authenticate def switch_node(): """切换到指定节点""" global clash_manager, initialization_error if clash_manager is None: return jsonify({ "success": False, "error": f"Clash未启动: {initialization_error or '未知错误'}" }), 503 data = request.get_json() if not data or "node" not in data: return jsonify({"success": False, "error": "缺少'node'参数"}), 400 node_name = data["node"] try: clash_manager.switch_node(node_name) return jsonify({"success": True, "message": f"已切换到节点: {node_name}"}) except Exception as e: logger.error(f"切换到节点 {node_name} 失败: {str(e)}") return jsonify({"success": False, "error": str(e)}), 500 @app.route("/api/current", methods=["GET"]) @authenticate def get_current_node(): """获取当前使用的节点""" global clash_manager, initialization_error if clash_manager is None: return jsonify({ "success": False, "error": f"Clash未启动: {initialization_error or '未知错误'}" }), 503 try: current_node = clash_manager.get_current_node() return jsonify({"success": True, "current_node": current_node}) except Exception as e: logger.error(f"获取当前节点失败: {str(e)}") return jsonify({"success": False, "error": str(e)}), 500 @app.route("/api/refresh", methods=["POST"]) @authenticate def refresh_subscription(): """刷新订阅并重新加载Clash配置(如果未使用手动配置)""" global clash_manager, sub_manager, initialization_error # 检查是否正在使用手动配置 if os.path.exists(MANUAL_CONFIG_MARKER): logger.info("正在使用手动配置文件,跳过订阅刷新。") return jsonify({"success": True, "message": "当前使用手动配置文件,无需刷新订阅。"}) try: # 尝试重新加载订阅 if sub_manager is None: sub_manager = SubscriptionManager( sub_url=SUB_URL, config_path=os.path.join(os.path.dirname(os.path.dirname(__file__)), "data", "config.yaml") ) sub_manager.load_and_convert_sub() if clash_manager is None: clash_manager = ClashManager( config_path=os.path.join(os.path.dirname(os.path.dirname(__file__)), "data", "config.yaml"), clash_path=os.path.join(os.path.dirname(os.path.dirname(__file__)), "clash_core", "clash.meta-linux-amd64"), api_port=CLASH_API_PORT, proxy_port=CLASH_PROXY_PORT ) clash_manager.start_clash() initialization_error = None return jsonify({"success": True, "message": "订阅已刷新,Clash已启动"}) else: clash_manager.restart_clash() initialization_error = None return jsonify({"success": True, "message": "订阅已刷新,Clash已重启"}) except Exception as e: error_msg = f"刷新订阅失败: {str(e)}" logger.error(error_msg) initialization_error = error_msg return jsonify({"success": False, "error": error_msg}), 500 @app.route("/health", methods=["GET"]) def health_check(): """健康检查接口""" return jsonify({"status": "ok"}) # --- 代理路由 --- # 弃用/proxy路由,Clash的代理端口由外部直接访问 # 如果需要在Hugging Face部署,代理通常通过Clash API的节点选择完成 # --- 调试路由 --- @app.route("/debug/clean", methods=["POST"]) @authenticate def debug_clean(): """清理配置、标记文件并重新初始化(恢复使用订阅)""" global clash_manager, sub_manager, initialization_error try: # 停止Clash if clash_manager is not None: clash_manager.stop_clash() clash_manager = None # 删除配置文件和标记文件 config_dir = os.path.join(os.path.dirname(__file__), "data") config_path = os.path.join(config_dir, "config.yaml") raw_config_path = f"{config_path}.raw" marker_path = MANUAL_CONFIG_MARKER files_to_delete = [config_path, raw_config_path, marker_path] deleted_files = [] for file_path in files_to_delete: if os.path.exists(file_path): try: os.remove(file_path) deleted_files.append(os.path.basename(file_path)) logger.info(f"已删除文件: {file_path}") except OSError as e: logger.warning(f"删除文件失败: {file_path}, 错误: {e}") # 强制重新初始化 (将恢复使用订阅,因为标记文件已删除) initialize_once._initialized = False initialize_once() status_msg = "配置已清理,恢复使用订阅链接。" if initialization_error is None else f"配置已清理,但重新初始化失败: {initialization_error}" return jsonify({"success": True, "message": status_msg, "deleted_files": deleted_files}) except Exception as e: logger.exception(f"清理配置时出错: {str(e)}") return jsonify({"success": False, "error": str(e)}), 500 @app.route("/debug/config", methods=["GET"]) @authenticate def debug_show_config(): """获取并显示当前Clash配置文件内容""" config_path = os.path.join(os.path.dirname(os.path.dirname(__file__)), "data", "config.yaml") if not os.path.exists(config_path): return jsonify({"success": False, "error": "配置文件不存在"}), 404 try: with open(config_path, "r", encoding="utf-8") as f: content = f.read() return jsonify({"success": True, "content": content}) except Exception as e: logger.error(f"读取配置文件时出错: {str(e)}") return jsonify({"success": False, "error": str(e)}), 500 # --- Yacd UI & Clash API 反向代理路由 --- @app.route('/ui/') def yacd_index(): """提供Yacd UI的入口文件""" # return send_from_directory('static/yacd', 'index.html') # 重定向到包含 index.html 的目录,确保相对路径正确加载 return redirect('/ui/index.html', code=301) @app.route('/ui/') def yacd_static(path): """提供Yacd UI的静态文件 (CSS, JS, images)""" return send_from_directory(os.path.join(app.static_folder, 'yacd'), path) @app.route('/clashapi/', methods=['GET', 'POST', 'PUT', 'DELETE', 'OPTIONS', 'HEAD', 'PATCH']) def clash_api_proxy(subpath): """反向代理请求到内部的Clash API""" global clash_manager, initialization_error, API_KEY # 1. 认证检查 (Yacd 使用 Authorization: Bearer 或 ?token=) auth_header = request.headers.get('Authorization') token = request.args.get('token') provided_key = None if auth_header and auth_header.startswith('Bearer '): provided_key = auth_header.split(' ', 1)[1] elif token: provided_key = token if provided_key != API_KEY: logger.warning(f"Clash API代理认证失败,路径: {subpath}") return jsonify({"message": "Authentication required"}), 401 # 2. 检查Clash是否运行 if clash_manager is None: logger.error(f"Clash API不可用,无法代理请求: {subpath}") return jsonify({ "success": False, "error": f"Clash API未运行: {initialization_error or '内部错误'}" }), 503 # 特殊处理 /logs 路径 (这是WebSocket接口,我们不支持) if subpath == 'logs': logger.info("请求了日志WebSocket接口,但当前版本不支持WebSocket代理") return jsonify({ "success": False, "error": "不支持WebSocket日志接口。请使用标准HTTP API。" }), 400 # 3. 构建目标URL target_url = f"http://127.0.0.1:{CLASH_API_PORT}/{subpath}" if request.query_string: target_url += '?' + request.query_string.decode('utf-8') logger.debug(f"代理Clash API请求到: {target_url}") # 4. 转发请求 try: # 准备请求头,移除Host req_headers = { k: v for k, v in request.headers.items() if k.lower() not in ['host', 'content-length'] } # 确保Content-Type正确传递,尤其对PUT请求 if request.method in ['POST', 'PUT', 'PATCH'] and 'Content-Type' not in req_headers: req_headers['Content-Type'] = 'application/json' # 对于PUT请求,确保正确获取请求数据 if request.method == 'PUT': logger.debug(f"处理PUT请求到 {target_url}, 数据: {request.data}") req_data = request.get_data() else: req_data = request.get_data() # 普通HTTP请求 resp = requests.request( method=request.method, url=target_url, headers=req_headers, data=req_data, cookies=request.cookies, allow_redirects=False, stream=True, # 对于大响应或流式响应 timeout=10 # 缩短超时,防止工作进程卡住 ) # 5. 构建并返回响应 excluded_headers = ['content-encoding', 'content-length', 'transfer-encoding', 'connection'] resp_headers = [(name, value) for name, value in resp.raw.headers.items() if name.lower() not in excluded_headers] # 使用iter_content流式传输响应体,以支持大数据和流 response = Response(resp.iter_content(chunk_size=8192), resp.status_code, resp_headers) return response except requests.exceptions.ConnectionError as e: logger.error(f"连接Clash API失败 ({target_url}): {str(e)}") return jsonify({"error": f"无法连接到内部Clash API: {str(e)}"}), 502 # Bad Gateway except requests.exceptions.Timeout as e: logger.error(f"请求Clash API超时 ({target_url}): {str(e)}") return jsonify({"error": f"请求内部Clash API超时: {str(e)}"}), 504 # Gateway Timeout except Exception as e: logger.error(f"代理Clash API请求时发生意外错误: {str(e)}") return jsonify({"error": f"代理请求时发生意外错误: {str(e)}"}), 500 # --- 新增:上传配置 API --- @app.route("/api/upload_config", methods=["POST"]) @authenticate def upload_config(): """上传并使用自定义配置文件""" global clash_manager, initialization_error if 'config_file' not in request.files: logger.error("上传配置请求中未找到名为 'config_file' 的文件") return jsonify({"success": False, "error": "未找到配置文件部分"}), 400 file = request.files['config_file'] if file.filename == '' or not file: logger.error("上传的文件无效或未选择") return jsonify({"success": False, "error": "未选择文件或文件无效"}), 400 # 限制文件名,防止路径遍历 filename = secure_filename(file.filename) if not filename.lower().endswith(('.yaml', '.yml')): logger.error(f"上传的文件类型不支持: {filename}") return jsonify({"success": False, "error": "只允许上传 .yaml 或 .yml 文件"}), 400 # 修正路径,确保指向 /app/data/ app_root = os.path.dirname(os.path.dirname(__file__)) config_dir = os.path.join(app_root, "data") config_path = os.path.join(config_dir, "config.yaml") marker_path = os.path.join(config_dir, ".use_manual_config") # 保持 marker 在 data 目录 try: # 确保数据目录存在 os.makedirs(config_dir, exist_ok=True) # 保存上传的文件,覆盖旧的 config.yaml file.save(config_path) logger.info(f"成功保存上传的配置文件到: {config_path}") # 创建标记文件,表示正在使用手动配置 with open(marker_path, 'w') as f: f.write(f"Uploaded at {time.time()}") logger.info(f"已创建手动配置标记文件: {marker_path}") # 重启Clash以加载新配置 if clash_manager is not None: logger.info("正在重启Clash以加载手动配置...") clash_manager.restart_clash() initialization_error = None # 清除之前的初始化错误 logger.info("Clash已使用手动配置重启") else: # 如果Clash未运行,尝试初始化并启动 logger.info("Clash未运行,尝试使用手动配置进行初始化和启动...") initialize_once._initialized = False initialize_once() if initialization_error: logger.error(f"使用手动配置启动Clash失败: {initialization_error}") # 即使启动失败,也保留手动配置和标记 else: logger.info("Clash已使用手动配置成功启动") return jsonify({"success": True, "message": "配置文件已上传并应用,Clash已重启。"}) except Exception as e: logger.exception(f"处理上传的配置文件时出错: {str(e)}") # 清理可能不完整的状态 if os.path.exists(marker_path): os.remove(marker_path) return jsonify({"success": False, "error": f"处理上传文件时出错: {str(e)}"}), 500 # --- 新增:WebSocket代理隧道 --- def forward_websocket_to_tcp(ws, tcp_socket): """从WebSocket读取数据并写入TCP Socket""" try: while not ws.closed: message = ws.receive() if message: # logger.debug(f"WS -> TCP: 转发 {len(message)} 字节") tcp_socket.sendall(message) else: # WebSocket连接关闭或收到空消息 break except Exception as e: logger.warning(f"WS -> TCP 转发错误: {e}") finally: logger.info("WS -> TCP 转发协程结束") if not tcp_socket._closed: tcp_socket.close() if not ws.closed: ws.close() def forward_tcp_to_websocket(tcp_socket, ws): """从TCP Socket读取数据并写入WebSocket""" try: while not ws.closed: data = tcp_socket.recv(4096) # 每次最多读取4KB if data: # logger.debug(f"TCP -> WS: 转发 {len(data)} 字节") ws.send(data) else: # TCP连接关闭 break except Exception as e: # 忽略WebSocket可能已经关闭的错误 if "closed" not in str(e).lower(): logger.warning(f"TCP -> WS 转发错误: {e}") finally: logger.info("TCP -> WS 转发协程结束") if not tcp_socket._closed: tcp_socket.close() if not ws.closed: ws.close() @sockets.route('/wsproxy') def websocket_proxy_tunnel(ws): """处理WebSocket连接,建立到Clash代理端口的TCP隧道""" global clash_manager, initialization_error # +++ 添加日志记录 +++ logger.info(f"收到 /wsproxy 请求,来自: {request.remote_addr}") logger.info("请求头:") for header, value in request.headers.items(): logger.info(f" {header}: {value}") # --- 日志记录结束 --- logger.info(f"开始处理 WebSocket 连接...") # 修改日志信息 # 1. 检查Clash是否运行 if clash_manager is None: logger.error(f"Clash服务未运行,无法建立WebSocket隧道") # 对于WebSocket错误,最好是直接关闭而不是返回HTTP错误 # ws.close(reason="Clash service is not running") # 但这里因为还没进入ws上下文,可能需要不同的处理 # 尝试返回一个错误,虽然客户端可能不期望 return "Clash service not running", 503 # 2. 建立到内部Clash代理端口的TCP连接 target_host = "127.0.0.1" target_port = CLASH_PROXY_PORT # 7890 tcp_socket = None try: tcp_socket = socket.create_connection((target_host, target_port), timeout=5) logger.info(f"成功连接到内部Clash代理端口: {target_host}:{target_port}") except Exception as e: logger.error(f"连接到内部Clash代理端口失败: {e}") ws.close(reason=f"Failed to connect to internal proxy: {e}") return # 3. 创建两个协程进行双向数据转发 logger.info("启动WebSocket和TCP之间的双向转发协程...") ws_to_tcp_greenlet = gevent.spawn(forward_websocket_to_tcp, ws, tcp_socket) tcp_to_ws_greenlet = gevent.spawn(forward_tcp_to_websocket, tcp_socket, ws) # 4. 等待任一协程结束 (表示连接中断) try: gevent.joinall([ws_to_tcp_greenlet, tcp_to_ws_greenlet], raise_error=True) except Exception as e: logger.warning(f"转发协程出现错误: {e}") finally: logger.info("WebSocket隧道连接已关闭") if tcp_socket and not tcp_socket._closed: tcp_socket.close() if ws and not ws.closed: ws.close() # --- 基础路由 --- @app.route('/', methods=['GET']) def index(): """首页 - 提供说明、状态和操作""" global initialization_error using_manual_config = os.path.exists(MANUAL_CONFIG_MARKER) status = "运行中" if initialization_error is None else "初始化失败" error_msg = "" if initialization_error is None else f"

错误: {initialization_error}

" config_mode_msg = "

当前模式:手动配置文件

" if using_manual_config else "

当前模式:订阅链接

" # 刷新按钮状态 refresh_button_disabled = "disabled" if using_manual_config else "" refresh_button_title = "title='当前使用手动配置,无需刷新订阅'" if using_manual_config else "" yacd_link = "访问高级控制面板 (Yacd)" return f""" Simple Clash Relay

Simple Clash Relay

状态: {status}

{error_msg} {config_mode_msg}

控制面板

{yacd_link}
⚠️ 重要提示:
  • 首次使用高级控制面板时,您需要在设置中填入:
    • 外部控制器地址:/clashapi
    • 密钥:changeme (除非您自定义了API_KEY)
  • 当前版本对控制面板有以下限制:
    • 不支持WebSocket接口,因此日志查看功能不可用
    • 某些PUT操作可能不稳定,如果遇到问题请刷新页面

基本操作

上传手动配置

您可以上传自己的 config.yaml 文件来覆盖订阅链接。上传后,系统将只使用此文件,不再进行订阅更新。

要恢复使用订阅链接,请使用下面的"清理配置并重启"按钮。

调试与恢复

帮助

API密钥可在 .env 文件或环境变量中设置 (API_KEY)。默认为 'changeme'。

高级控制面板 (Yacd) 需要在设置中配置 API 地址为 /clashapi 并提供密钥。

更多信息请参考项目文档或README。

""" if __name__ == "__main__": # 如果直接运行此文件,将初始化应用并启动Flask服务器 initialize_once() logger.info(f"启动 Flask/Gevent WebSocket 服务器,监听端口: {FLASK_PORT}") # 使用 gevent 的 WSGI 服务器来支持 WebSocket server = pywsgi.WSGIServer(('0.0.0.0', FLASK_PORT), app, handler_class=WebSocketHandler) server.serve_forever()