clash / app /clash_manager.py
clash-linux's picture
Upload 17 files
03d4200 verified
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
"""
Clash管理器 - 负责Clash Core进程的启动、停止和API调用
"""
import os
import time
import signal
import logging
import subprocess
import requests
import json
from .debug_tools import run_debug_diagnostics
logger = logging.getLogger(__name__)
class ClashManager:
"""管理Clash Core进程和与其API的交互"""
def __init__(self, config_path, clash_path, api_port=9090, proxy_port=7890):
"""
初始化Clash管理器
Args:
config_path: Clash配置文件路径
clash_path: Clash可执行文件路径
api_port: Clash API监听端口
proxy_port: Clash代理监听端口
"""
self.config_path = os.path.abspath(config_path)
self.clash_path = os.path.abspath(clash_path)
self.api_port = api_port
self.proxy_port = proxy_port
self.api_base_url = f"http://127.0.0.1:{api_port}"
self.clash_process = None
# 确保Clash可执行文件存在
if not os.path.exists(clash_path):
raise FileNotFoundError(f"Clash可执行文件未找到: {clash_path}")
def start_clash(self):
"""启动Clash Core进程"""
if self.clash_process and self.clash_process.poll() is None:
logger.info("Clash Core已经在运行中")
return
# 确保配置文件存在
if not os.path.exists(self.config_path):
# 检查目录是否存在
config_dir = os.path.dirname(self.config_path)
if not os.path.exists(config_dir):
try:
os.makedirs(config_dir, exist_ok=True)
logger.info(f"创建了配置目录: {config_dir}")
except Exception as e:
logger.error(f"无法创建配置目录: {str(e)}")
# 如果目录存在但文件不存在,尝试创建一个基础配置
try:
with open(self.config_path, "w", encoding="utf-8") as f:
f.write(self._get_base_config())
logger.info(f"创建了基础配置文件: {self.config_path}")
except Exception as e:
logger.error(f"无法创建配置文件: {str(e)}")
raise FileNotFoundError(f"Clash配置文件未找到且无法创建: {self.config_path}")
# 验证配置文件内容
self._validate_config_file()
# 运行调试诊断
logger.info("运行环境诊断...")
try:
diagnostics = run_debug_diagnostics(self.clash_path, self.config_path)
if not diagnostics.get("clash_binary", {}).get("is_executable", False):
logger.warning("Clash Core不是可执行文件,尝试设置执行权限")
try:
os.chmod(self.clash_path, 0o755)
logger.info("已设置执行权限")
except Exception as e:
logger.error(f"设置执行权限失败: {str(e)}")
except Exception as e:
logger.error(f"运行诊断时出错: {str(e)}")
# 设置Clash命令行参数 (兼容Clash Meta)
cmd = [
self.clash_path,
"-f", self.config_path,
"-d", os.path.dirname(self.config_path)
]
# 为Clash Meta添加额外参数
if "meta" in self.clash_path.lower():
# Clash Meta特有参数
cmd.extend([
"-ext-ctl", f"127.0.0.1:{self.api_port}",
# 如果需要可以添加更多Clash Meta特有参数
])
else:
# 原始Clash参数
cmd.extend([
"-ext-ctl", f"127.0.0.1:{self.api_port}",
"-ext-ui", "" # 禁用外部UI
])
# 启动Clash进程
logger.info(f"正在启动Clash Core: {' '.join(cmd)}")
# 首先尝试检查Clash可执行文件是否存在并且可执行
if not os.path.exists(self.clash_path):
raise FileNotFoundError(f"Clash可执行文件未找到: {self.clash_path}")
# 检查Clash文件是否有执行权限
try:
# 尝试设置执行权限
os.chmod(self.clash_path, 0o755)
logger.info(f"已设置Clash Core的执行权限")
except Exception as e:
logger.warning(f"无法设置执行权限: {str(e)}, 尝试继续执行...")
# 启动进程并捕获输出
try:
self.clash_process = subprocess.Popen(
cmd,
stdout=subprocess.PIPE,
stderr=subprocess.PIPE,
universal_newlines=True
)
# 等待Clash启动
logger.info("等待Clash Core启动...")
time.sleep(2)
# 检查进程是否成功启动
if self.clash_process.poll() is not None:
# 如果进程已经退出,获取错误输出
stdout, stderr = self.clash_process.communicate()
logger.error(f"Clash启动失败,退出代码: {self.clash_process.returncode}")
logger.error(f"标准输出: {stdout}")
logger.error(f"错误输出: {stderr}")
# 如果没有错误输出,尝试读取配置文件周围的日志
if not stderr.strip():
try:
# 查看配置文件内容
with open(self.config_path, 'r', encoding='utf-8') as f:
config_content = f.read()
logger.error(f"配置文件内容预览 (前100字符): {config_content[:100]}")
except Exception as file_err:
logger.error(f"读取配置文件失败: {str(file_err)}")
# 创建更有意义的错误消息
error_message = stderr or stdout or "无错误信息,可能是程序无法执行或立即崩溃"
raise RuntimeError(f"Clash启动失败: {error_message}")
# 进程仍在运行,验证API是否可访问
logger.info("进程仍在运行,验证API...")
try:
# 尝试多次连接API,有时需要一些时间
max_retries = 3
for i in range(max_retries):
try:
self._call_api("GET", "/version")
logger.info("Clash API已就绪")
return # 成功,退出函数
except Exception as api_err:
if i < max_retries - 1:
logger.warning(f"API连接尝试 {i+1}/{max_retries} 失败: {str(api_err)},重试...")
time.sleep(1)
else:
# 最后一次尝试失败,捕获并记录错误
stdout, stderr = self.clash_process.communicate(timeout=1)
logger.error(f"API连接失败,标准输出: {stdout}")
logger.error(f"API连接失败,错误输出: {stderr}")
# 杀掉进程并抛出异常
self.stop_clash()
raise RuntimeError(f"无法连接到Clash API: {str(api_err)},进程输出: {stderr or stdout}")
except Exception as e:
# 捕获其他异常
self.stop_clash()
raise RuntimeError(f"验证API时出错: {str(e)}")
except (subprocess.SubprocessError, OSError) as e:
# 进程启动失败
logger.error(f"启动Clash进程时出错: {str(e)}")
raise RuntimeError(f"无法启动Clash进程: {str(e)}")
def stop_clash(self):
"""停止Clash Core进程"""
if self.clash_process and self.clash_process.poll() is None:
logger.info("正在停止Clash Core...")
# 尝试优雅地终止进程
self.clash_process.terminate()
# 等待进程终止
try:
self.clash_process.wait(timeout=5)
except subprocess.TimeoutExpired:
# 如果进程没有及时终止,强制结束
logger.warning("Clash进程未响应终止信号,强制结束...")
self.clash_process.kill()
self.clash_process = None
logger.info("Clash Core已停止")
def restart_clash(self):
"""重启Clash Core进程"""
logger.info("正在重启Clash Core...")
self.stop_clash()
time.sleep(1) # 给进程一些时间完全终止
self.start_clash()
logger.info("Clash Core已重启")
def get_nodes(self):
"""
获取所有可用的代理节点名称列表
Returns:
list: 节点名称列表
"""
response = self._call_api("GET", "/proxies")
proxies = response.get("proxies", {})
# 过滤出实际的代理节点(排除DIRECT, REJECT等内置代理和策略组)
node_names = []
for name, proxy in proxies.items():
if proxy.get("type") not in ["Direct", "Reject", "Selector", "URLTest", "Fallback", "LoadBalance"]:
node_names.append(name)
return node_names
def switch_node(self, node_name):
"""
切换到指定的代理节点
Args:
node_name: 节点名称
Raises:
ValueError: 如果节点名称无效
"""
# 获取所有节点以验证目标节点存在
all_nodes = self.get_nodes()
if node_name not in all_nodes:
raise ValueError(f"无效的节点名称: {node_name}")
# 切换GLOBAL策略组到指定节点
# 注意:这里假设使用GLOBAL作为顶级策略组,你可能需要根据实际配置调整
try:
self._call_api("PUT", "/proxies/GLOBAL", json={"name": node_name})
logger.info(f"已切换到节点: {node_name}")
except Exception as e:
raise RuntimeError(f"切换节点失败: {str(e)}")
def get_current_node(self):
"""
获取当前使用的节点名称
Returns:
str: 当前节点名称
"""
# 获取GLOBAL策略组的当前选择
# 注意:这里假设使用GLOBAL作为顶级策略组,你可能需要根据实际配置调整
response = self._call_api("GET", "/proxies/GLOBAL")
return response.get("now", "unknown")
def _call_api(self, method, endpoint, **kwargs):
"""
调用Clash的API
Args:
method: HTTP方法 (GET, POST, PUT等)
endpoint: API端点路径
**kwargs: 传递给requests的其他参数
Returns:
dict: API响应的JSON数据
Raises:
RuntimeError: 如果API调用失败
"""
url = f"{self.api_base_url}{endpoint}"
logger.debug(f"调用Clash API: {method} {url}")
try:
response = requests.request(method, url, timeout=10, **kwargs)
response.raise_for_status()
return response.json()
except requests.RequestException as e:
logger.error(f"Clash API调用失败: {str(e)}")
raise RuntimeError(f"Clash API调用失败: {str(e)}")
def __del__(self):
"""析构函数,确保进程在对象销毁时被终止"""
self.stop_clash()
def _validate_config_file(self):
"""验证配置文件内容,确保基本配置存在"""
try:
with open(self.config_path, "r", encoding="utf-8") as f:
content = f.read()
# 检查文件大小
if len(content) < 10:
logger.warning(f"配置文件内容过短: {len(content)} 字节")
# 尝试使用基础配置替换
with open(self.config_path, "w", encoding="utf-8") as f:
f.write(self._get_base_config())
logger.info("已使用基础配置替换")
return
# 检查基本配置是否存在 (非严格YAML解析,简单文本检查)
missing_configs = []
if "mixed-port:" not in content and "port:" not in content:
missing_configs.append("端口配置")
if "proxies:" not in content:
missing_configs.append("代理配置")
if missing_configs:
logger.warning(f"配置文件缺少: {', '.join(missing_configs)}")
# 如果缺少关键配置,尝试修复
has_patch = False
if "mixed-port:" not in content and "port:" not in content:
content = f"mixed-port: {self.proxy_port}\n" + content
has_patch = True
if "external-controller:" not in content:
content = f"external-controller: 127.0.0.1:{self.api_port}\n" + content
has_patch = True
if "proxies:" not in content:
content += "\nproxies:\n - name: DIRECT\n type: Direct\n"
has_patch = True
if has_patch:
with open(self.config_path, "w", encoding="utf-8") as f:
f.write(content)
logger.info("已修补配置文件")
except Exception as e:
logger.error(f"验证配置文件时出错: {str(e)}")
# 如果验证失败,尝试使用基础配置
try:
with open(self.config_path, "w", encoding="utf-8") as f:
f.write(self._get_base_config())
logger.info("已使用基础配置替换")
except Exception as write_err:
logger.error(f"无法写入基础配置: {str(write_err)}")
def _get_base_config(self):
"""返回基础Clash配置"""
return f"""# 基础Clash配置
mixed-port: {self.proxy_port}
allow-lan: true
mode: Rule
log-level: info
external-controller: 127.0.0.1:{self.api_port}
secret: ""
proxies:
- name: DIRECT
type: Direct
proxy-groups:
- name: GLOBAL
type: select
proxies:
- DIRECT
rules:
- MATCH,DIRECT
"""