Spaces:
Paused
Paused
| #!/usr/bin/env python3 | |
| # -*- coding: utf-8 -*- | |
| """ | |
| 订阅管理器 - 负责下载订阅内容并转换为Clash配置 | |
| """ | |
| import os | |
| import logging | |
| import subprocess | |
| import requests | |
| import time | |
| from urllib.parse import urlparse | |
| logger = logging.getLogger(__name__) | |
| class SubscriptionManager: | |
| """管理订阅链接的下载和配置转换""" | |
| def __init__(self, sub_url, config_path): | |
| """ | |
| 初始化订阅管理器 | |
| Args: | |
| sub_url: 订阅链接URL | |
| config_path: 生成的Clash配置文件保存路径 | |
| """ | |
| self.sub_url = sub_url | |
| self.config_path = os.path.abspath(config_path) | |
| self.subconverter_path = os.path.join( | |
| os.path.dirname(os.path.dirname(os.path.abspath(__file__))), | |
| "subconverter", "subconverter" | |
| ) | |
| # 检查是否设置了订阅链接 | |
| if not sub_url: | |
| raise ValueError("未设置订阅链接 (SUB_URL)") | |
| # 确保配置目录存在 | |
| os.makedirs(os.path.dirname(self.config_path), exist_ok=True) | |
| # 检查subconverter可执行文件是否存在 | |
| if not os.path.exists(self.subconverter_path): | |
| raise FileNotFoundError(f"subconverter可执行文件未找到: {self.subconverter_path}") | |
| def load_and_convert_sub(self): | |
| """ | |
| 下载订阅内容并转换为Clash配置 | |
| Returns: | |
| str: 生成的Clash配置文件路径 | |
| Raises: | |
| RuntimeError: 如果下载或转换失败 | |
| """ | |
| # 下载订阅内容 | |
| sub_content = self._download_subscription() | |
| # 保存订阅内容到临时文件 | |
| temp_file = f"{self.config_path}.raw" | |
| with open(temp_file, "w", encoding="utf-8") as f: | |
| f.write(sub_content) | |
| # 使用subconverter转换为Clash配置 | |
| self._convert_to_clash(temp_file) | |
| # 修改配置文件以确保端口设置正确 | |
| self._patch_config() | |
| # 清理临时文件 | |
| try: | |
| os.remove(temp_file) | |
| except OSError: | |
| pass | |
| return self.config_path | |
| def _download_subscription(self): | |
| """ | |
| 下载订阅内容 | |
| Returns: | |
| str: 订阅内容文本 | |
| Raises: | |
| RuntimeError: 如果下载失败 | |
| """ | |
| logger.info(f"正在下载订阅: {self._mask_url(self.sub_url)}") | |
| try: | |
| headers = { | |
| "User-Agent": "ClashforWindows/0.19.0", | |
| "Accept": "*/*", | |
| } | |
| response = requests.get(self.sub_url, headers=headers, timeout=30) | |
| response.raise_for_status() | |
| content = response.text | |
| if not content or len(content) < 10: | |
| raise RuntimeError("下载的订阅内容为空或过短") | |
| logger.info(f"成功下载订阅,大小: {len(content)} 字节") | |
| return content | |
| except requests.RequestException as e: | |
| logger.error(f"下载订阅失败: {str(e)}") | |
| raise RuntimeError(f"下载订阅失败: {str(e)}") | |
| def _convert_to_clash(self, input_file): | |
| """ | |
| 使用subconverter将订阅内容转换为Clash配置 | |
| Args: | |
| input_file: 包含订阅内容的文件路径 | |
| Raises: | |
| RuntimeError: 如果转换失败 | |
| """ | |
| logger.info(f"正在将订阅转换为Clash配置") | |
| # 准备subconverter命令 | |
| cmd = [ | |
| self.subconverter_path, | |
| "-g", # 生成配置文件 | |
| "--target", "clash", # 输出格式为Clash (修改自 --artifact) | |
| "--input", input_file, # 输入文件 | |
| "--output", self.config_path, # 输出文件 | |
| "--include-remarks", ".*" # 包含所有节点 | |
| ] | |
| # 如果subconverter不存在或执行出错,我们就尝试直接使用订阅内容 | |
| if not os.path.exists(self.subconverter_path): | |
| logger.warning("subconverter不存在,尝试直接使用订阅内容") | |
| with open(input_file, "r", encoding="utf-8") as f: | |
| content = f.read() | |
| with open(self.config_path, "w", encoding="utf-8") as f: | |
| f.write(content) | |
| return | |
| try: | |
| # 执行subconverter | |
| process = subprocess.Popen( | |
| cmd, | |
| stdout=subprocess.PIPE, | |
| stderr=subprocess.PIPE, | |
| universal_newlines=True | |
| ) | |
| stdout, stderr = process.communicate(timeout=30) | |
| if process.returncode != 0: | |
| logger.error(f"subconverter执行失败: {stderr}") | |
| # 错误处理:尝试直接使用订阅内容 | |
| with open(input_file, "r", encoding="utf-8") as f: | |
| content = f.read() | |
| with open(self.config_path, "w", encoding="utf-8") as f: | |
| f.write(content) | |
| logger.warning("尝试直接使用订阅内容作为配置文件") | |
| else: | |
| logger.info("成功转换配置") | |
| except (subprocess.SubprocessError, OSError) as e: | |
| logger.error(f"执行subconverter时出错: {str(e)}") | |
| raise RuntimeError(f"配置转换失败: {str(e)}") | |
| def _patch_config(self): | |
| """ | |
| 修改配置文件以确保端口设置正确,并兼容Clash Meta | |
| """ | |
| # 检查配置文件是否存在 | |
| if not os.path.exists(self.config_path): | |
| logger.warning(f"配置文件不存在,无法修补: {self.config_path}") | |
| return | |
| try: | |
| # 读取配置内容 | |
| with open(self.config_path, "r", encoding="utf-8") as f: | |
| config_content = f.read() | |
| # 确保配置包含必要的端口设置 | |
| has_patch = False | |
| # 这里需要检查配置是否为有效的YAML并进行适当修补 | |
| # 为简单起见,我们只检查和添加一些基本端口配置 | |
| if "port: 7890" not in config_content and "mixed-port: 7890" not in config_content: | |
| # 添加混合端口配置 | |
| config_content = "mixed-port: 7890\n" + config_content | |
| has_patch = True | |
| if "external-controller: 127.0.0.1:9090" not in config_content and "external-controller: :9090" not in config_content: | |
| # 添加API控制器配置 (兼容Clash Meta) | |
| config_content = "external-controller: 127.0.0.1:9090\n" + config_content | |
| has_patch = True | |
| # Clash Meta特定配置 | |
| if "find-process-mode: strict" not in config_content: | |
| config_content = "find-process-mode: strict\n" + config_content | |
| has_patch = True | |
| # 确保启用了API | |
| if "secret: " not in config_content: | |
| config_content = "secret: ''\n" + config_content | |
| has_patch = True | |
| # 确保配置了全局策略组 | |
| if "GLOBAL" not in config_content and "- name: GLOBAL" not in config_content: | |
| # 我们可能需要添加全局策略组,但这取决于具体的配置结构 | |
| # 此处简化处理,仅检测,不修改 | |
| logger.warning("未检测到GLOBAL策略组,切换节点功能可能无法正常工作") | |
| # 如果我们修改了配置,保存回文件 | |
| if has_patch: | |
| with open(self.config_path, "w", encoding="utf-8") as f: | |
| f.write(config_content) | |
| logger.info("已修补配置文件以添加必要的设置") | |
| except Exception as e: | |
| logger.error(f"修补配置文件时出错: {str(e)}") | |
| def _mask_url(self, url): | |
| """ | |
| 遮蔽URL中的敏感信息用于日志记录 | |
| Args: | |
| url: 原始URL | |
| Returns: | |
| str: 遮蔽后的URL | |
| """ | |
| try: | |
| parsed = urlparse(url) | |
| netloc = parsed.netloc | |
| # 如果URL包含用户名和密码,则遮蔽密码 | |
| if "@" in netloc: | |
| userpass, host = netloc.split("@", 1) | |
| if ":" in userpass: | |
| user, _ = userpass.split(":", 1) | |
| netloc = f"{user}:***@{host}" | |
| masked_url = url.replace(parsed.netloc, netloc) | |
| # 确保不显示完整的token或密钥 | |
| if "?" in masked_url: | |
| base, query = masked_url.split("?", 1) | |
| masked_url = f"{base}?****" | |
| return masked_url | |
| except Exception: | |
| # 如果解析失败,返回更简单的遮蔽 | |
| return f"{url[:10]}...{url[-5:]}" if len(url) > 15 else "***" |