clash / app /sub_manager.py
clash-linux's picture
Upload 21 files
fda2131 verified
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
"""
订阅管理器 - 负责下载订阅内容并转换为Clash配置
"""
import os
import logging
import subprocess
import requests
import time
from urllib.parse import urlparse
logger = logging.getLogger(__name__)
class SubscriptionManager:
"""管理订阅链接的下载和配置转换"""
MANUAL_CONFIG_MARKER = os.path.join(os.path.dirname(os.path.dirname(__file__)), "data", ".use_manual_config")
def __init__(self, sub_url, config_path):
"""
初始化订阅管理器
Args:
sub_url: 订阅链接URL
config_path: 生成的Clash配置文件保存路径
"""
self.sub_url = sub_url
self.config_path = os.path.abspath(config_path)
self.raw_config_path = f"{self.config_path}.raw"
self.subconverter_path = os.path.join(
os.path.dirname(os.path.dirname(os.path.abspath(__file__))),
"subconverter", "subconverter"
)
# 检查是否设置了订阅链接
if not sub_url:
raise ValueError("未设置订阅链接 (SUB_URL)")
# 确保配置目录存在
os.makedirs(os.path.dirname(self.config_path), exist_ok=True)
# 检查subconverter可执行文件是否存在
if not os.path.exists(self.subconverter_path):
raise FileNotFoundError(f"subconverter可执行文件未找到: {self.subconverter_path}")
def load_and_convert_sub(self):
"""下载订阅并转换为Clash配置,或使用现有配置"""
# 检查是否使用了手动上传的配置
if os.path.exists(self.MANUAL_CONFIG_MARKER):
logger.info("检测到手动配置文件标记,跳过订阅下载和转换,直接使用现有文件。")
if not os.path.exists(self.config_path):
logger.error("手动配置模式,但配置文件 config.yaml 不存在!")
raise FileNotFoundError("手动配置模式,但配置文件 config.yaml 不存在!")
# 不再对 手动配置文件 执行 _patch_config
# self._patch_config()
logger.info("已确认手动配置文件存在。")
return
# 如果没有设置订阅URL,且配置文件已存在,则直接使用
if not self.sub_url:
if os.path.exists(self.config_path):
logger.info("未设置订阅URL,使用现有的配置文件")
self._patch_config()
return
else:
logger.error("未设置订阅URL,且配置文件不存在!")
raise ValueError("必须提供订阅URL或已存在的配置文件")
# 下载订阅
try:
logger.info(f"正在下载订阅: {self._mask_url(self.sub_url)}")
content = self._download_subscription()
# 将原始订阅内容保存到 .raw 文件 (使用文本模式和UTF-8编码)
with open(self.raw_config_path, "w", encoding="utf-8") as f:
f.write(content)
logger.info(f"成功下载订阅,大小: {len(content.encode('utf-8'))} 字节") # 计算字节大小
except Exception as e:
logger.error(f"下载订阅失败: {str(e)}")
# 如果下载失败,但旧配置文件存在,则继续使用旧的
if os.path.exists(self.config_path):
logger.warning("下载失败,继续使用旧的配置文件")
self._patch_config()
return
else:
raise RuntimeError(f"下载订阅失败: {str(e)}")
# 转换配置
try:
logger.info("正在将订阅转换为Clash配置")
self._convert_to_clash(self.raw_config_path)
except Exception as e:
logger.error(f"转换配置失败: {str(e)}")
# 如果转换失败,但旧配置文件存在,尝试使用旧的
if os.path.exists(self.config_path):
logger.warning("转换失败,尝试使用旧的配置文件")
# 如果转换失败,且没有旧配置,尝试直接使用原始下载内容
elif os.path.exists(self.raw_config_path):
logger.warning("转换失败,尝试直接使用原始订阅内容作为配置文件")
try:
with open(self.raw_config_path, "r", encoding="utf-8") as infile, \
open(self.config_path, "w", encoding="utf-8") as outfile:
outfile.write(infile.read())
logger.info("已将原始订阅内容复制为配置文件")
except Exception as copy_err:
logger.error(f"复制原始订阅内容失败: {copy_err}")
raise RuntimeError(f"转换配置失败,且无法使用原始订阅: {str(e)}")
else:
raise RuntimeError(f"转换配置失败: {str(e)}")
# 修补配置文件(添加端口、API等)
self._patch_config()
return self.config_path
def _download_subscription(self):
"""
下载订阅内容
Returns:
str: 订阅内容文本
Raises:
RuntimeError: 如果下载失败
"""
try:
headers = {
"User-Agent": "ClashforWindows/0.19.0",
"Accept": "*/*",
}
response = requests.get(self.sub_url, headers=headers, timeout=30)
response.raise_for_status()
content = response.text
if not content or len(content) < 10:
raise RuntimeError("下载的订阅内容为空或过短")
logger.info(f"成功下载订阅,大小: {len(content)} 字节")
return content
except requests.RequestException as e:
logger.error(f"下载订阅失败: {str(e)}")
raise RuntimeError(f"下载订阅失败: {str(e)}")
def _convert_to_clash(self, input_file):
"""
使用subconverter将订阅内容转换为Clash配置
Args:
input_file: 包含订阅内容的文件路径
Raises:
RuntimeError: 如果转换失败
"""
logger.info(f"正在将订阅转换为Clash配置")
logger.info(f"输入文件: {input_file}, 配置路径: {self.config_path}")
# 确保数据目录存在
data_dir = os.path.dirname(self.config_path)
if not os.path.exists(data_dir):
logger.info(f"创建数据目录: {data_dir}")
try:
os.makedirs(data_dir, exist_ok=True)
except Exception as e:
logger.error(f"创建数据目录失败: {str(e)}")
# 尝试直接读取订阅内容,确认它是否已经是Clash配置
try:
with open(input_file, "r", encoding="utf-8") as f:
content = f.read()
# 简单检查是否已经是Clash配置
if "proxies:" in content and ("port:" in content or "mixed-port:" in content):
logger.info("检测到输入文件已是Clash配置格式,直接使用")
with open(self.config_path, "w", encoding="utf-8") as f:
f.write(content)
return
except Exception as e:
logger.warning(f"读取输入文件时出错: {str(e)},将尝试转换")
# 准备subconverter命令
cmd = [
self.subconverter_path,
"-g", # 生成配置文件
"--target", "clash", # 输出格式为Clash (修正为 --target)
"--url", input_file, # 修正为 --url 参数
"--output", self.config_path, # 输出文件
"--include-remarks", ".*" # 包含所有节点
]
logger.info(f"执行命令: {' '.join(cmd)}")
# 如果subconverter不存在或执行出错,我们就尝试直接使用订阅内容
if not os.path.exists(self.subconverter_path):
logger.warning("subconverter不存在,尝试直接使用订阅内容")
try:
with open(input_file, "r", encoding="utf-8") as f:
content = f.read()
with open(self.config_path, "w", encoding="utf-8") as f:
f.write(content)
logger.info(f"已将订阅内容直接写入到: {self.config_path}")
# 验证文件是否成功写入
if os.path.exists(self.config_path):
logger.info(f"文件已成功写入,大小: {os.path.getsize(self.config_path)} 字节")
else:
logger.error(f"文件写入失败,{self.config_path} 不存在")
except Exception as e:
logger.error(f"直接使用订阅内容时出错: {str(e)}")
raise RuntimeError(f"写入配置文件失败: {str(e)}")
return
try:
# 执行subconverter
process = subprocess.Popen(
cmd,
stdout=subprocess.PIPE,
stderr=subprocess.PIPE,
universal_newlines=True
)
stdout, stderr = process.communicate(timeout=30)
logger.info(f"subconverter输出: {stdout[:200]}...") # 限制日志长度
if process.returncode != 0:
logger.error(f"subconverter执行失败: {stderr}")
# 错误处理:尝试直接使用订阅内容
try:
with open(input_file, "r", encoding="utf-8") as f:
content = f.read()
# 确保它是有效的配置,如果是普通订阅格式,添加基本的Clash头
if "proxies:" not in content:
content = self._add_clash_headers() + content
with open(self.config_path, "w", encoding="utf-8") as f:
f.write(content)
logger.warning("尝试直接使用订阅内容作为配置文件")
# 验证文件是否成功写入
if os.path.exists(self.config_path):
logger.info(f"文件已成功写入,大小: {os.path.getsize(self.config_path)} 字节")
else:
logger.error(f"文件写入失败,{self.config_path} 不存在")
except Exception as e:
logger.error(f"使用订阅内容作为配置文件时出错: {str(e)}")
raise RuntimeError(f"写入配置文件失败: {str(e)}")
else:
logger.info("成功转换配置")
# 验证输出文件是否存在
if os.path.exists(self.config_path):
logger.info(f"配置文件已生成,路径: {self.config_path},大小: {os.path.getsize(self.config_path)} 字节")
else:
# 如果文件不存在但subconverter返回成功,尝试查找配置文件
logger.warning(f"subconverter声称成功但配置文件不存在: {self.config_path}")
# 查找当前目录下可能生成的配置文件
possible_files = [f for f in os.listdir('.') if f.endswith('.yaml') or f.endswith('.yml')]
if possible_files:
logger.info(f"找到可能的配置文件: {possible_files}")
# 尝试复制找到的第一个文件
try:
import shutil
shutil.copy(possible_files[0], self.config_path)
logger.info(f"已复制 {possible_files[0]}{self.config_path}")
except Exception as e:
logger.error(f"复制文件失败: {str(e)}")
else:
logger.error("未找到任何可能的配置文件")
# 尝试使用原始订阅内容作为配置
try:
with open(input_file, "r", encoding="utf-8") as f:
content = f.read()
# 确保它是有效的配置,如果是普通订阅格式,添加基本的Clash头
if "proxies:" not in content:
content = self._add_clash_headers() + content
with open(self.config_path, "w", encoding="utf-8") as f:
f.write(content)
logger.warning("使用订阅内容作为配置文件")
except Exception as e:
logger.error(f"使用订阅内容时出错: {str(e)}")
raise RuntimeError(f"写入配置文件失败: {str(e)}")
except (subprocess.SubprocessError, OSError) as e:
logger.error(f"执行subconverter时出错: {str(e)}")
raise RuntimeError(f"配置转换失败: {str(e)}")
def _add_clash_headers(self):
"""添加基本的Clash配置头"""
return """# 自动生成的Clash配置
port: 7890
socks-port: 7891
mixed-port: 7890
allow-lan: true
mode: Rule
log-level: info
external-controller: 127.0.0.1:9090
secret: ""
"""
def _patch_config(self):
"""
修改配置文件以确保端口设置正确,并兼容Clash Meta
"""
# 检查配置文件是否存在
if not os.path.exists(self.config_path):
logger.warning(f"配置文件不存在,无法修补: {self.config_path}")
return
try:
# 读取配置内容
with open(self.config_path, "r", encoding="utf-8") as f:
config_content = f.read()
# 确保配置包含必要的端口设置
has_patch = False
# 这里需要检查配置是否为有效的YAML并进行适当修补
# 为简单起见,我们只检查和添加一些基本端口配置
if "mixed-port:" not in config_content and "port:" not in config_content:
# 添加混合端口配置
config_content = "mixed-port: 7890\n" + config_content
has_patch = True
# 不要添加重复的external-controller配置
if "external-controller:" not in config_content:
# 添加API控制器配置 (兼容Clash Meta)
config_content = "external-controller: 127.0.0.1:9090\n" + config_content
has_patch = True
# Clash Meta特定配置
if "find-process-mode:" not in config_content:
config_content = "find-process-mode: strict\n" + config_content
has_patch = True
# 确保启用了API
if "secret:" not in config_content:
config_content = "secret: ''\n" + config_content
has_patch = True
# 尝试解析YAML并修复代理组引用问题
try:
import yaml
logger.info("正在使用PyYAML解析和修复配置")
# 解析配置
config_yaml = yaml.safe_load(config_content)
# 检查是否有代理和代理组
if config_yaml and isinstance(config_yaml, dict):
if "proxies" in config_yaml and "proxy-groups" in config_yaml:
# 获取所有代理节点名称
proxy_names = set()
for proxy in config_yaml.get("proxies", []):
if isinstance(proxy, dict) and "name" in proxy:
proxy_names.add(proxy["name"])
# 添加内置代理
proxy_names.add("DIRECT")
proxy_names.add("REJECT")
# 修复代理组引用
groups_fixed = False
for group in config_yaml.get("proxy-groups", []):
if isinstance(group, dict) and "proxies" in group:
# 删除组中引用的不存在的代理
valid_proxies = []
for proxy in group["proxies"]:
if proxy in proxy_names or proxy == group.get("name", ""):
valid_proxies.append(proxy)
else:
logger.warning(f"删除代理组 {group.get('name', '未知')} 中的无效引用: {proxy}")
groups_fixed = True
# 确保代理组至少有一个有效代理
if not valid_proxies:
valid_proxies.append("DIRECT")
logger.warning(f"为空代理组 {group.get('name', '未知')} 添加默认代理: DIRECT")
groups_fixed = True
group["proxies"] = valid_proxies
# 检查是否有GLOBAL组,如果没有,添加一个
has_global = False
for group in config_yaml.get("proxy-groups", []):
if isinstance(group, dict) and group.get("name") == "GLOBAL":
has_global = True
break
if not has_global:
# 添加GLOBAL策略组
logger.info("添加GLOBAL策略组")
if "proxy-groups" not in config_yaml:
config_yaml["proxy-groups"] = []
# 确定要放入GLOBAL组的代理
global_proxies = ["DIRECT"]
# 添加所有代理节点到GLOBAL组
for proxy in config_yaml.get("proxies", []):
if isinstance(proxy, dict) and "name" in proxy:
proxy_name = proxy["name"]
if proxy_name != "DIRECT" and proxy_name != "REJECT":
global_proxies.append(proxy_name)
config_yaml["proxy-groups"].insert(0, {
"name": "GLOBAL",
"type": "select",
"proxies": global_proxies
})
groups_fixed = True
has_patch = True
# 如果修复了代理组,重新生成配置
if groups_fixed:
config_content = yaml.dump(config_yaml, sort_keys=False, allow_unicode=True)
has_patch = True
else:
# 配置缺少代理或代理组,添加基本结构
if "proxies" not in config_yaml:
config_yaml["proxies"] = [{"name": "DIRECT", "type": "direct"}]
has_patch = True
if "proxy-groups" not in config_yaml:
config_yaml["proxy-groups"] = [{
"name": "GLOBAL",
"type": "select",
"proxies": ["DIRECT"]
}]
has_patch = True
# 更新配置内容
config_content = yaml.dump(config_yaml, sort_keys=False, allow_unicode=True)
else:
# 配置为空或无效,添加基本配置
logger.warning("配置为空或无效,添加基本配置结构")
# 使用简单的文本分析,避免添加重复的配置
has_proxy_groups = "proxy-groups:" in config_content
has_global_group = "GLOBAL" in config_content or "- name: GLOBAL" in config_content
if not has_proxy_groups and not has_global_group:
logger.info("未检测到proxy-groups或GLOBAL策略组,添加基本的GLOBAL策略组")
config_content += """
proxy-groups:
- name: GLOBAL
type: select
proxies:
- DIRECT
"""
has_patch = True
else:
logger.info("检测到已有proxy-groups或GLOBAL配置,跳过添加")
except ImportError as e:
logger.error(f"导入PyYAML模块失败: {str(e)}")
# 使用简单的文本分析,避免添加重复的配置
has_proxy_groups = "proxy-groups:" in config_content
has_global_group = "GLOBAL" in config_content or "- name: GLOBAL" in config_content
if not has_proxy_groups and not has_global_group:
logger.info("未检测到proxy-groups或GLOBAL策略组,添加基本的GLOBAL策略组")
config_content += """
proxy-groups:
- name: GLOBAL
type: select
proxies:
- DIRECT
"""
has_patch = True
else:
logger.info("检测到已有proxy-groups或GLOBAL配置,跳过添加")
except Exception as yaml_err:
logger.error(f"处理YAML配置时出错: {str(yaml_err)}")
# 使用简单的文本分析,避免添加重复的配置
has_proxy_groups = "proxy-groups:" in config_content
has_global_group = "GLOBAL" in config_content or "- name: GLOBAL" in config_content
if not has_proxy_groups and not has_global_group:
logger.info("未检测到proxy-groups或GLOBAL策略组,添加基本的GLOBAL策略组")
config_content += """
proxy-groups:
- name: GLOBAL
type: select
proxies:
- DIRECT
"""
has_patch = True
else:
logger.info("检测到已有proxy-groups或GLOBAL配置,跳过添加")
# 如果我们修改了配置,保存回文件
if has_patch:
with open(self.config_path, "w", encoding="utf-8") as f:
f.write(config_content)
logger.info("已修补配置文件以添加必要的设置")
except Exception as e:
logger.error(f"修补配置文件时出错: {str(e)}")
def _mask_url(self, url):
"""
遮蔽URL中的敏感信息用于日志记录
Args:
url: 原始URL
Returns:
str: 遮蔽后的URL
"""
try:
parsed = urlparse(url)
netloc = parsed.netloc
# 如果URL包含用户名和密码,则遮蔽密码
if "@" in netloc:
userpass, host = netloc.split("@", 1)
if ":" in userpass:
user, _ = userpass.split(":", 1)
netloc = f"{user}:***@{host}"
masked_url = url.replace(parsed.netloc, netloc)
# 确保不显示完整的token或密钥
if "?" in masked_url:
base, query = masked_url.split("?", 1)
masked_url = f"{base}?****"
return masked_url
except Exception:
# 如果解析失败,返回更简单的遮蔽
return f"{url[:10]}...{url[-5:]}" if len(url) > 15 else "***"