clash / app /sub_manager.py
clash-linux's picture
Upload 20 files
115e454 verified
raw
history blame
22.3 kB
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
"""
订阅管理器 - 负责下载订阅内容并转换为Clash配置
"""
import os
import logging
import subprocess
import requests
import time
from urllib.parse import urlparse
logger = logging.getLogger(__name__)
class SubscriptionManager:
"""管理订阅链接的下载和配置转换"""
def __init__(self, sub_url, config_path):
"""
初始化订阅管理器
Args:
sub_url: 订阅链接URL
config_path: 生成的Clash配置文件保存路径
"""
self.sub_url = sub_url
self.config_path = os.path.abspath(config_path)
self.subconverter_path = os.path.join(
os.path.dirname(os.path.dirname(os.path.abspath(__file__))),
"subconverter", "subconverter"
)
# 检查是否设置了订阅链接
if not sub_url:
raise ValueError("未设置订阅链接 (SUB_URL)")
# 确保配置目录存在
os.makedirs(os.path.dirname(self.config_path), exist_ok=True)
# 检查subconverter可执行文件是否存在
if not os.path.exists(self.subconverter_path):
raise FileNotFoundError(f"subconverter可执行文件未找到: {self.subconverter_path}")
def load_and_convert_sub(self):
"""
下载订阅内容并转换为Clash配置
Returns:
str: 生成的Clash配置文件路径
Raises:
RuntimeError: 如果下载或转换失败
"""
# 下载订阅内容
sub_content = self._download_subscription()
# 保存订阅内容到临时文件
temp_file = f"{self.config_path}.raw"
with open(temp_file, "w", encoding="utf-8") as f:
f.write(sub_content)
# 使用subconverter转换为Clash配置
self._convert_to_clash(temp_file)
# 修改配置文件以确保端口设置正确
self._patch_config()
# 清理临时文件
try:
os.remove(temp_file)
except OSError:
pass
return self.config_path
def _download_subscription(self):
"""
下载订阅内容
Returns:
str: 订阅内容文本
Raises:
RuntimeError: 如果下载失败
"""
logger.info(f"正在下载订阅: {self._mask_url(self.sub_url)}")
try:
headers = {
"User-Agent": "ClashforWindows/0.19.0",
"Accept": "*/*",
}
response = requests.get(self.sub_url, headers=headers, timeout=30)
response.raise_for_status()
content = response.text
if not content or len(content) < 10:
raise RuntimeError("下载的订阅内容为空或过短")
logger.info(f"成功下载订阅,大小: {len(content)} 字节")
return content
except requests.RequestException as e:
logger.error(f"下载订阅失败: {str(e)}")
raise RuntimeError(f"下载订阅失败: {str(e)}")
def _convert_to_clash(self, input_file):
"""
使用subconverter将订阅内容转换为Clash配置
Args:
input_file: 包含订阅内容的文件路径
Raises:
RuntimeError: 如果转换失败
"""
logger.info(f"正在将订阅转换为Clash配置")
logger.info(f"输入文件: {input_file}, 配置路径: {self.config_path}")
# 确保数据目录存在
data_dir = os.path.dirname(self.config_path)
os.makedirs(data_dir, exist_ok=True)
# 准备subconverter命令 - 使用 -i/-o 并强制指定空白配置
minimal_pref_path = os.path.join(os.path.dirname(self.subconverter_path), "minimal_pref.yml")
cmd = [
self.subconverter_path,
"-i", input_file, # 输入文件
"-o", self.config_path, # 输出文件
"-t", "clash", # 目标格式
"--config", minimal_pref_path # 强制使用空白配置,避免默认行为
]
logger.info(f"执行命令: {' '.join(cmd)}")
# 检查 subconverter 是否存在 (之前的代码已修复)
if not os.path.exists(self.subconverter_path):
# ... (省略 subconverter 未找到的回退逻辑) ...
logger.error(f"subconverter未找到: {self.subconverter_path}. 无法转换配置。")
raise RuntimeError(f"无法转换配置 (subconverter未找到)")
# --- 正式执行 subconverter ---
try:
# 执行subconverter
process = subprocess.Popen(
cmd,
stdout=subprocess.PIPE,
stderr=subprocess.PIPE,
universal_newlines=True,
cwd=os.path.dirname(self.subconverter_path) # 尝试在subconverter目录下运行
)
stdout, stderr = process.communicate(timeout=60)
# 记录完整的 subconverter 输出用于调试
logger.info(f"subconverter STDOUT:\n---\n{stdout}\n---")
if stderr:
logger.warning(f"subconverter STDERR:\n---\n{stderr}\n---")
# 检查 subconverter 返回码
if process.returncode != 0:
logger.error(f"subconverter执行失败,返回码: {process.returncode}.")
# ... (省略 subconverter 失败的回退逻辑) ...
raise RuntimeError(f"subconverter转换失败(code {process.returncode})。 Subconverter STDERR: {stderr[:200]}...")
else:
# subconverter 返回码为 0,表示理论上成功
logger.info("subconverter成功执行 (返回码 0)")
# 验证输出文件是否存在且非空
if os.path.exists(self.config_path) and os.path.getsize(self.config_path) > 0:
logger.info(f"配置文件已生成,路径: {self.config_path},大小: {os.path.getsize(self.config_path)} 字节")
else:
logger.error(f"subconverter执行成功,但配置文件不存在或为空: {self.config_path}. 请检查subconverter日志了解详情。")
# 检查STDERR中是否有线索
error_detail = stderr if stderr else "(无错误输出)"
raise RuntimeError(f"subconverter执行成功但未生成有效配置文件。 Subconverter STDERR: {error_detail[:500]}...")
except subprocess.TimeoutExpired:
logger.error(f"执行subconverter超时 (超过60秒)")
raise RuntimeError("subconverter执行超时")
except (subprocess.SubprocessError, OSError) as e:
logger.error(f"执行subconverter时发生错误: {str(e)}")
raise RuntimeError(f"配置转换失败: {str(e)}")
def _add_clash_headers(self):
"""添加基本的Clash配置头"""
return """# 自动生成的Clash配置
port: 7890
socks-port: 7891
mixed-port: 7890
allow-lan: true
mode: Rule
log-level: info
external-controller: 127.0.0.1:9090
secret: ""
"""
def _patch_config(self):
"""
修改配置文件以确保端口设置正确,并兼容Clash Meta
"""
# 检查配置文件是否存在
if not os.path.exists(self.config_path):
logger.warning(f"配置文件不存在,无法修补: {self.config_path}")
return
try:
# 读取配置内容
with open(self.config_path, "r", encoding="utf-8") as f:
config_content = f.read()
# 确保配置包含必要的端口设置
has_patch = False
# 这里需要检查配置是否为有效的YAML并进行适当修补
# 为简单起见,我们只检查和添加一些基本端口配置
if "mixed-port:" not in config_content and "port:" not in config_content:
# 添加混合端口配置
config_content = "mixed-port: 7890\n" + config_content
has_patch = True
# 不要添加重复的external-controller配置
if "external-controller:" not in config_content:
# 添加API控制器配置 (兼容Clash Meta)
config_content = "external-controller: 127.0.0.1:9090\n" + config_content
has_patch = True
# Clash Meta特定配置
if "find-process-mode:" not in config_content:
config_content = "find-process-mode: strict\n" + config_content
has_patch = True
# 确保启用了API
if "secret:" not in config_content:
config_content = "secret: ''\n" + config_content
has_patch = True
# 尝试解析YAML并修复代理组引用问题
try:
import yaml
logger.info("正在使用PyYAML解析和修复配置")
# 解析配置
config_yaml = yaml.safe_load(config_content)
# 检查解析结果是否为有效字典
if not isinstance(config_yaml, dict):
logger.error("配置文件解析结果不是有效的YAML字典,无法进行修复")
# 使用简单的文本分析,避免添加重复的配置
has_proxy_groups = "proxy-groups:" in config_content
has_global_group = "GLOBAL" in config_content or "- name: GLOBAL" in config_content
if not has_proxy_groups and not has_global_group:
logger.info("未检测到proxy-groups或GLOBAL策略组,添加基本的GLOBAL策略组")
config_content += "\nproxy-groups:\n - name: GLOBAL\n type: select\n proxies:\n - DIRECT\n"
has_patch = True
else:
logger.info("检测到已有proxy-groups或GLOBAL配置,跳过添加")
else:
# --- 开始修复逻辑 ---
groups_fixed = False
proxies_list = config_yaml.get("proxies")
# 1. 检查代理列表是否有效
if not proxies_list or not isinstance(proxies_list, list):
logger.error("配置文件缺少有效的 'proxies' 列表或列表为空!将仅使用 DIRECT 代理。请检查原始订阅内容。")
# 设置一个最小化的 proxies 和 proxy-groups
# 不要在 proxies 中显式定义 DIRECT,Clash 内置了它
config_yaml["proxies"] = []
config_yaml["proxy-groups"] = [{
"name": "GLOBAL",
"type": "select",
"proxies": ["DIRECT"] # 直接引用内置的 DIRECT
}]
proxy_names = {"DIRECT", "REJECT"} # 仍然需要知道内置名称
groups_fixed = True # 标记需要重新生成配置内容
has_patch = True
else:
# 2. 代理列表有效,获取所有代理节点名称
proxy_names = set()
valid_proxies_list = []
for proxy in proxies_list:
if isinstance(proxy, dict) and "name" in proxy:
proxy_names.add(proxy["name"])
valid_proxies_list.append(proxy) # 只保留有效的代理定义
else:
logger.warning(f"配置文件中的 'proxies' 列表包含无效条目: {proxy}")
# 如果所有条目都无效,则回退
if not valid_proxies_list:
logger.error("配置文件中的 'proxies' 列表所有条目均无效!将仅使用 DIRECT 代理。")
config_yaml["proxies"] = [{"name": "DIRECT", "type": "direct"}]
config_yaml["proxy-groups"] = [{
"name": "GLOBAL",
"type": "select",
"proxies": ["DIRECT"]
}]
proxy_names = {"DIRECT", "REJECT"}
groups_fixed = True
has_patch = True
else:
config_yaml["proxies"] = valid_proxies_list # 更新为仅包含有效代理的列表
# 添加内置代理
proxy_names.add("DIRECT")
proxy_names.add("REJECT")
# 3. 修复或添加代理组
proxy_groups = config_yaml.get("proxy-groups", [])
if not isinstance(proxy_groups, list):
logger.warning("'proxy-groups' 不是一个列表,将重新创建")
proxy_groups = []
config_yaml["proxy-groups"] = proxy_groups
groups_fixed = True
has_global = False
valid_groups = []
for group in proxy_groups:
if not isinstance(group, dict) or "name" not in group or "type" not in group:
logger.warning(f"'proxy-groups' 中包含无效组定义: {group}")
groups_fixed = True
continue # 跳过无效组
if "proxies" in group:
if not isinstance(group["proxies"], list):
logger.warning(f"代理组 '{group['name']}' 的 'proxies' 不是列表,设置为 ['DIRECT']")
group["proxies"] = ["DIRECT"]
groups_fixed = True
else:
# 删除组中引用的不存在的代理
original_group_proxies = group["proxies"].copy()
group["proxies"] = [p for p in group["proxies"] if p in proxy_names or p == group["name"]]
if len(group["proxies"]) != len(original_group_proxies):
removed = set(original_group_proxies) - set(group["proxies"])
if removed:
logger.warning(f"删除代理组 '{group['name']}' 中的无效引用: {', '.join(removed)}")
groups_fixed = True
# 确保代理组至少有一个有效代理
if not group["proxies"]:
logger.warning(f"为空代理组 '{group['name']}' 添加默认代理: DIRECT")
group["proxies"].append("DIRECT")
groups_fixed = True
else:
# 如果组定义没有proxies列表(例如 url-test),通常是正常的
pass
valid_groups.append(group)
if group["name"] == "GLOBAL":
has_global = True
# 更新为只包含有效组定义的列表
config_yaml["proxy-groups"] = valid_groups
# 4. 如果没有 GLOBAL 组,添加一个
if not has_global:
logger.info("添加 GLOBAL 策略组")
# 确定要放入 GLOBAL 组的代理 (所有已知真实代理 + DIRECT)
global_proxies = [p for p in proxy_names if p not in ("DIRECT", "REJECT")]
global_proxies.sort() # 可选:排序
global_proxies.insert(0, "DIRECT") # 将 DIRECT 放前面
config_yaml["proxy-groups"].insert(0, {
"name": "GLOBAL",
"type": "select",
"proxies": global_proxies
})
groups_fixed = True
has_patch = True
# 5. 如果进行了修复,重新序列化 YAML
if groups_fixed:
try:
config_content = yaml.dump(config_yaml, sort_keys=False, allow_unicode=True, default_flow_style=None)
has_patch = True # 确保标记为已修改
except Exception as dump_err:
logger.error(f"重新序列化 YAML 失败: {dump_err}")
# 序列化失败,放弃修复,依赖原始 content + 基础 patch
# 重新读取原始content,只应用基础patch
with open(self.config_path, "r", encoding="utf-8") as f:
config_content = f.read()
if "mixed-port:" not in config_content and "port:" not in config_content: config_content = "mixed-port: 7890\n" + config_content
if "external-controller:" not in config_content: config_content = "external-controller: 127.0.0.1:9090\n" + config_content
if "find-process-mode:" not in config_content: config_content = "find-process-mode: strict\n" + config_content
if "secret:" not in config_content: config_content = "secret: ''\n" + config_content
has_patch = True # 标记基础 patch 已应用
# --- 处理 YAML 导入或解析错误的回退逻辑 ---
except ImportError as e:
logger.error(f"导入PyYAML模块失败: {str(e)}. 无法执行详细配置修复。")
# 使用简单的文本分析,避免添加重复的配置
has_proxy_groups = "proxy-groups:" in config_content
has_global_group = "GLOBAL" in config_content or "- name: GLOBAL" in config_content
if not has_proxy_groups and not has_global_group:
logger.info("未检测到proxy-groups或GLOBAL策略组,添加基本的GLOBAL策略组")
config_content += "\nproxy-groups:\n - name: GLOBAL\n type: select\n proxies:\n - DIRECT\n"
has_patch = True
else:
logger.info("检测到已有proxy-groups或GLOBAL配置,跳过添加")
except Exception as yaml_err:
logger.error(f"处理YAML配置时出错: {str(yaml_err)}. 无法执行详细配置修复。")
# 使用简单的文本分析,避免添加重复的配置
has_proxy_groups = "proxy-groups:" in config_content
has_global_group = "GLOBAL" in config_content or "- name: GLOBAL" in config_content
if not has_proxy_groups and not has_global_group:
logger.info("未检测到proxy-groups或GLOBAL策略组,添加基本的GLOBAL策略组")
config_content += "\nproxy-groups:\n - name: GLOBAL\n type: select\n proxies:\n - DIRECT\n"
has_patch = True
else:
logger.info("检测到已有proxy-groups或GLOBAL配置,跳过添加")
# --- 写回文件 ---
if has_patch:
logger.info("检测到配置更改,正在写回文件...")
try:
with open(self.config_path, "w", encoding="utf-8") as f:
f.write(config_content)
logger.info("已修补配置文件并成功保存")
except Exception as write_err:
logger.error(f"写回配置文件失败: {write_err}")
# 记录错误,但允许程序继续
else:
logger.info("配置文件无需修补")
except Exception as e:
logger.error(f"修补配置文件过程中发生意外错误: {str(e)}")
def _mask_url(self, url):
"""
遮蔽URL中的敏感信息用于日志记录
Args:
url: 原始URL
Returns:
str: 遮蔽后的URL
"""
try:
parsed = urlparse(url)
netloc = parsed.netloc
# 如果URL包含用户名和密码,则遮蔽密码
if "@" in netloc:
userpass, host = netloc.split("@", 1)
if ":" in userpass:
user, _ = userpass.split(":", 1)
netloc = f"{user}:***@{host}"
masked_url = url.replace(parsed.netloc, netloc)
# 确保不显示完整的token或密钥
if "?" in masked_url:
base, query = masked_url.split("?", 1)
masked_url = f"{base}?****"
return masked_url
except Exception:
# 如果解析失败,返回更简单的遮蔽
return f"{url[:10]}...{url[-5:]}" if len(url) > 15 else "***"