| import logging, sys |
| import streamlit as st |
| import asyncio |
| import aiohttp |
| import yaml |
| import threading |
| import time |
| from datetime import datetime |
| from typing import List, Dict, Optional |
| import warnings |
| import queue |
|
|
| |
| try: |
| import aioquic |
| AIOQUIC_AVAILABLE = True |
| except ImportError: |
| AIOQUIC_AVAILABLE = False |
| print("警告: aioquic 未安装,VMess/VLESS/Trojan 将只测试端口连通性") |
|
|
| try: |
| import socks |
| SHADOWSOCKS_AVAILABLE = True |
| except ImportError: |
| SHADOWSOCKS_AVAILABLE = False |
| print("警告: PySocks 未安装,Shadowsocks 将只测试端口连通性") |
|
|
| warnings.filterwarnings('ignore') |
|
|
| |
| logging.basicConfig( |
| level=logging.INFO, |
| format='%(asctime)s [%(levelname)s] %(message)s', |
| handlers=[ |
| logging.FileHandler('/tmp/streamlit_test.log', mode='w'), |
| logging.StreamHandler(sys.stderr) |
| ] |
| ) |
| logger = logging.getLogger(__name__) |
|
|
| st.set_page_config(page_title="节点可用性测试", layout="wide") |
| print("测试测试测试11") |
|
|
| |
| st.markdown(""" |
| <style> |
| .stProgress > div > div > div > div { |
| background-color: #4CAF50; |
| } |
| .main { |
| padding-top: 2rem; |
| } |
| </style> |
| """, unsafe_allow_html=True) |
|
|
|
|
| class NodeTester: |
| def __init__(self, yaml_content: str, test_url: str, max_concurrent: int, timeout: int): |
| logger.info(f"NodeTester 初始化 - yaml长度={len(yaml_content)}, max_concurrent={max_concurrent}, timeout={timeout}") |
| self.yaml_content = yaml_content |
| self.test_url = test_url |
| self.max_concurrent = max_concurrent |
| self.timeout = timeout |
| self.available_nodes: List[Dict] = [] |
| self.results: List[tuple] = [] |
| self.running = True |
| self._lock = threading.Lock() |
| self._progress = 0 |
| self._total = 0 |
| self._session: Optional[aiohttp.ClientSession] = None |
| |
| self._progress_queue: queue.Queue = queue.Queue() |
| |
| self._progress_callback = None |
|
|
| def set_progress_callback(self, callback): |
| """设置进度回调""" |
| self._progress_callback = callback |
|
|
| def get_progress(self): |
| """获取当前进度(非阻塞)""" |
| try: |
| return self._progress_queue.get_nowait() |
| except queue.Empty: |
| return None |
|
|
| def clear_queue(self): |
| """清空进度队列""" |
| while not self._progress_queue.empty(): |
| try: |
| self._progress_queue.get_nowait() |
| except: |
| break |
|
|
| def parse_yaml(self) -> List[Dict]: |
| """解析 YAML 文件,提取所有节点""" |
| nodes = [] |
| |
| |
| try: |
| data = yaml.safe_load(self.yaml_content) |
| if not data: |
| return [] |
| |
| |
| if 'proxies' in data: |
| for proxy in data['proxies']: |
| nodes.append(self._normalize_clash_node(proxy)) |
| |
| |
| elif 'proxy-providers' in data: |
| for provider_name, provider in data['proxy-providers'].items(): |
| if 'url' in provider: |
| |
| pass |
| elif 'proxy' in provider: |
| for proxy in provider['proxy']: |
| nodes.append(self._normalize_clash_node(proxy)) |
| |
| |
| elif isinstance(data, list): |
| for item in data: |
| if isinstance(item, dict): |
| nodes.append(self._normalize_v2ray_node(item)) |
| |
| |
| elif isinstance(data, dict): |
| nodes.append(self._normalize_v2ray_node(data)) |
| |
| except yaml.YAMLError as e: |
| st.error(f"YAML 解析失败: {e}") |
| return [] |
| |
| |
| if not nodes: |
| nodes = self._parse_manual() |
| |
| return [n for n in nodes if n.get('server')] |
|
|
| def _normalize_clash_node(self, proxy: Dict) -> Dict: |
| """标准化 Clash 节点格式""" |
| type_map = { |
| 'ss': 'shadowsocks', |
| 'vmess': 'vmess', |
| 'vless': 'vless', |
| 'trojan': 'trojan', |
| 'http': 'http', |
| 'socks5': 'socks5', |
| } |
| |
| return { |
| 'name': proxy.get('name', ''), |
| 'type': type_map.get(proxy.get('type', 'http'), proxy.get('type', 'http')), |
| 'server': proxy.get('server', ''), |
| 'port': proxy.get('port', 0), |
| 'username': proxy.get('username', ''), |
| 'password': proxy.get('password', ''), |
| 'cipher': proxy.get('cipher', 'aes-256-gcm'), |
| 'uuid': proxy.get('uuid', ''), |
| 'alterId': proxy.get('alterId', 0), |
| 'network': proxy.get('network', 'tcp'), |
| 'tls': proxy.get('tls', False), |
| 'skip-cert-verify': proxy.get('skip-cert-verify', False), |
| } |
|
|
| def _normalize_v2ray_node(self, node: Dict) -> Dict: |
| """标准化 V2Ray 节点格式""" |
| return { |
| 'name': node.get('name', node.get('ps', '')), |
| 'type': node.get('type', 'vmess'), |
| 'server': node.get('add', node.get('server', '')), |
| 'port': int(node.get('port', 0)), |
| 'uuid': node.get('id', node.get('uuid', '')), |
| 'alterId': int(node.get('aid', node.get('alterId', 0))), |
| 'network': node.get('net', 'tcp'), |
| 'tls': node.get('tls', ''), |
| } |
|
|
| def _parse_manual(self) -> List[Dict]: |
| """手动解析(备选方案)""" |
| nodes = [] |
| lines = self.yaml_content.splitlines() |
| |
| i = 0 |
| while i < len(lines): |
| line = lines[i].strip() |
| |
| if line.startswith('name:') or line.startswith('- name:'): |
| node = {'name': line.split('name:')[1].strip().strip('"\'')} |
| |
| |
| indent = len(lines[i]) - len(lines[i].lstrip()) |
| i += 1 |
| |
| while i < len(lines): |
| curr_line = lines[i] |
| curr_indent = len(curr_line) - len(curr_line.lstrip()) |
| |
| if curr_line.strip() and curr_indent <= indent: |
| break |
| |
| if 'server:' in curr_line: |
| node['server'] = curr_line.split('server:')[1].strip() |
| elif 'port:' in curr_line: |
| try: |
| node['port'] = int(curr_line.split('port:')[1].strip()) |
| except: |
| pass |
| elif 'type:' in curr_line: |
| node['type'] = curr_line.split('type:')[1].strip() |
| |
| i += 1 |
| |
| if node.get('server'): |
| nodes.append(node) |
| else: |
| i += 1 |
| |
| return nodes |
|
|
| async def _test_http_node(self, session: aiohttp.ClientSession, node: Dict) -> tuple: |
| """测试 HTTP/SOCKS5 代理""" |
| name = node.get('name', node.get('server', 'Unknown')) |
| server = node.get('server') |
| port = node.get('port', 0) |
| |
| if not server or not port: |
| return False, f"⏭️ {name}: 配置错误" |
| |
| proxy_type = node.get('type', 'http') |
| |
| |
| if proxy_type in ['socks5', 'socks']: |
| proxy_url = f"socks5://{server}:{port}" |
| else: |
| proxy_url = f"http://{server}:{port}" |
| |
| |
| auth = None |
| if node.get('username') and node.get('password'): |
| auth = aiohttp.BasicAuth(node['username'], node['password']) |
| |
| try: |
| timeout = aiohttp.ClientTimeout(total=self.timeout) |
| async with session.get( |
| self.test_url, |
| proxy=proxy_url, |
| timeout=timeout, |
| ssl=False if node.get('skip-cert-verify') else True |
| ) as resp: |
| if resp.status == 200: |
| return True, f"✅ {name}" |
| else: |
| return False, f"❌ {name}: HTTP {resp.status}" |
| except asyncio.TimeoutError: |
| return False, f"⏱️ {name}: 超时" |
| except aiohttp.ClientError as e: |
| err_msg = str(e)[:30] |
| return False, f"❌ {name}: {err_msg}" |
| except Exception as e: |
| return False, f"❌ {name}: {str(e)[:30]}" |
|
|
| async def _test_shadowsocks_node(self, session: aiohttp.ClientSession, node: Dict) -> tuple: |
| """测试 Shadowsocks 节点""" |
| name = node.get('name', 'SS节点') |
| server = node.get('server') |
| port = node.get('port', 0) |
| |
| if not server or not port: |
| return False, f"⏭️ {name}: 配置错误" |
| |
| |
| if SHADOWSOCKS_AVAILABLE: |
| logger.info(f"[SS] {name} 尝试真实代理测试: {server}:{port}") |
| return await self._test_shadowsocks_real(server, port, name, node) |
| else: |
| |
| logger.info(f"[SS] {name} 回退到端口测试: {server}:{port}") |
| return await self._test_tcp_port(server, port, name) |
| |
| async def _test_shadowsocks_real(self, server: str, port: int, name: str, node: Dict) -> tuple: |
| """真实的 Shadowsocks 代理测试 - 通过代理发起HTTP请求验证""" |
| try: |
| cipher = node.get('cipher', 'aes-256-gcm') |
| password = node.get('password', '') |
| |
| if not password: |
| return await self._test_tcp_port(server, port, name) |
| |
| |
| result = await asyncio.to_thread( |
| self._test_shadowsocks_sync, server, port, name, node |
| ) |
| return result |
| |
| except Exception as e: |
| err_msg = str(e)[:50] |
| return False, f"❌ {name}: {err_msg}" |
| |
| def _test_shadowsocks_sync(self, server: str, port: int, name: str, node: Dict) -> tuple: |
| """同步的 Shadowsocks 测试(在线程池中运行)""" |
| import socks |
| import socket |
| |
| try: |
| |
| s = socks.socksocket(socket.AF_INET, socket.SOCK_STREAM) |
| s.set_proxy(socks.SOCKS5, server, port, username=None, password=None, rdns=True) |
| s.settimeout(10) |
| |
| |
| test_hosts = [ |
| ('142.250.185.206', 80), |
| ('172.64.155.4', 80), |
| ] |
| |
| for host, test_port in test_hosts: |
| try: |
| s.connect((host, test_port)) |
| s.send(b'GET / HTTP/1.1\r\nHost: www.google.com\r\nConnection: close\r\n\r\n') |
| data = s.recv(1024) |
| s.close() |
| |
| if data and len(data) > 0: |
| return True, f"✅ {name} (SS代理正常)" |
| except Exception: |
| continue |
| |
| return False, f"⏭️ {name}: 代理无响应" |
| |
| except ImportError: |
| return False, f"⏭️ {name}: PySocks未安装" |
| except Exception as e: |
| return False, f"❌ {name}: {str(e)[:30]}" |
| |
| async def _test_tcp_port(self, server: str, port: int, name: str) -> tuple: |
| """TCP 端口连通性测试""" |
| try: |
| reader, writer = await asyncio.wait_for( |
| asyncio.open_connection(server, port), |
| timeout=3 |
| ) |
| writer.close() |
| await writer.wait_closed() |
| return True, f"✅ {name} (端口可达)" |
| except asyncio.TimeoutError: |
| return False, f"⏱️ {name}: 端口超时" |
| except Exception as e: |
| return False, f"❌ {name}: 端口不可达" |
|
|
| async def _test_vmess_node(self, session: aiohttp.ClientSession, node: Dict) -> tuple: |
| """测试 VMess/VLESS/Trojan 节点""" |
| name = node.get('name', 'VMess节点') |
| server = node.get('server') |
| port = node.get('port', 0) |
| |
| if not server or not port: |
| return False, f"⏭️ {name}: 配置错误" |
| |
| |
| if AIOQUIC_AVAILABLE: |
| return await self._test_quic_protocol(server, port, name, node) |
| else: |
| |
| return await self._test_tcp_port(server, port, name) |
| |
| async def _test_quic_protocol(self, server: str, port: int, name: str, node: Dict) -> tuple: |
| """测试 VMess/VLESS/Trojan - 真实协议测试""" |
| try: |
| |
| reader, writer = await asyncio.wait_for( |
| asyncio.open_connection(server, port), |
| timeout=5 |
| ) |
| |
| |
| if node.get('tls') or node.get('network') == 'ws': |
| pass |
| |
| writer.close() |
| await writer.wait_closed() |
| |
| return True, f"✅ {name} (端口可达)" |
| |
| except asyncio.TimeoutError: |
| return False, f"⏱️ {name}: 连接超时" |
| except Exception as e: |
| err_msg = str(e)[:30] |
| return False, f"❌ {name}: {err_msg}" |
|
|
| async def test_node(self, session: aiohttp.ClientSession, node: Dict) -> tuple: |
| """根据节点类型选择测试方法""" |
| node_type = node.get('type', 'http').lower() |
| |
| if node_type in ['http', 'socks5', 'socks']: |
| return await self._test_http_node(session, node) |
| elif node_type == 'shadowsocks': |
| return await self._test_shadowsocks_node(session, node) |
| elif node_type in ['vmess', 'vless', 'trojan']: |
| return await self._test_vmess_node(session, node) |
| else: |
| |
| return await self._test_http_node(session, node) |
|
|
| async def _worker(self, session: aiohttp.ClientSession, nodes: List[Dict], start_idx: int, step: int): |
| """工作协程""" |
| for i in range(start_idx, len(nodes), step): |
| if not self.running: |
| break |
| |
| node = nodes[i] |
| success, message = await self.test_node(session, node) |
| |
| with self._lock: |
| self.results.append((success, message)) |
| self._progress += 1 |
| progress = self._progress |
| |
| if self._progress % 10 == 0 or self._progress == self._total: |
| logger.debug(f"进度: {self._progress}/{self._total}") |
| |
| if self._progress_callback: |
| self._progress_callback(progress, self._total) |
| |
| self._progress_queue.put((progress, self._total, success, message)) |
| |
| if success: |
| with self._lock: |
| self.available_nodes.append(node) |
|
|
| async def test_direct_connection(self) -> tuple: |
| """测试直连""" |
| try: |
| timeout = aiohttp.ClientTimeout(total=self.timeout) |
| async with aiohttp.ClientSession() as session: |
| async with session.get(self.test_url, timeout=timeout, ssl=False) as resp: |
| if resp.status == 200: |
| return True, f"✅ 直连成功 (HTTP {resp.status})" |
| else: |
| return False, f"⚠️ 直连返回: HTTP {resp.status}" |
| except asyncio.TimeoutError: |
| return False, "⏱️ 直连超时" |
| except Exception as e: |
| return False, f"❌ 直连失败: {str(e)[:50]}" |
|
|
| async def test_all_nodes_async(self): |
| """并发测试所有节点""" |
| logger.info("开始 parse_yaml...") |
| nodes = self.parse_yaml() |
| self._total = len(nodes) |
| |
| |
| logger.info(f"解析到 {len(nodes)} 个节点") |
| |
| if not nodes: |
| logger.warning("没有解析到任何节点!") |
| return 0, 0 |
| |
| |
| timeout = aiohttp.ClientTimeout(total=self.timeout) |
| connector = aiohttp.TCPConnector( |
| limit=self.max_concurrent, |
| limit_per_host=self.max_concurrent, |
| ttl_dns_cache=300, |
| ) |
| |
| async with aiohttp.ClientSession(connector=connector, timeout=timeout) as session: |
| |
| tasks = [] |
| for i in range(self.max_concurrent): |
| task = asyncio.create_task( |
| self._worker(session, nodes, i, self.max_concurrent) |
| ) |
| tasks.append(task) |
| |
| |
| await asyncio.gather(*tasks, return_exceptions=True) |
| |
| return self._total, len(self.available_nodes) |
|
|
| def test_all_nodes(self): |
| """同步包装 - 兼容受限环境""" |
| return asyncio.run(self.test_all_nodes_async()) |
|
|
| def get_available_yaml(self) -> Optional[str]: |
| """生成可用节点的 YAML(Clash 格式,可直接导入)""" |
| if not self.available_nodes: |
| return None |
| |
| output = "# 可用节点列表 - 自动生成\n" |
| output += f"# 生成时间: {datetime.now().strftime('%Y-%m-%d %H:%M:%S')}\n" |
| output += "# 节点数量: " + str(len(self.available_nodes)) + "\n\n" |
| output += "proxies:\n" |
| |
| for node in self.available_nodes: |
| output += " - " |
| |
| parts = [f"name: {node.get('name', 'Unknown')}"] |
| parts.append(f"type: {node.get('type', 'http')}") |
| parts.append(f"server: {node.get('server', '')}") |
| parts.append(f"port: {node.get('port', 0)}") |
| |
| |
| if node.get('username'): |
| parts.append(f"username: {node.get('username')}") |
| if node.get('password'): |
| parts.append(f"password: {node.get('password')}") |
| if node.get('cipher'): |
| parts.append(f"cipher: {node.get('cipher')}") |
| if node.get('uuid'): |
| parts.append(f"uuid: {node.get('uuid')}") |
| if node.get('alterId'): |
| parts.append(f"alterId: {node.get('alterId')}") |
| if node.get('network'): |
| parts.append(f"network: {node.get('network')}") |
| if node.get('tls'): |
| parts.append(f"tls: {str(node.get('tls')).lower()}") |
| if node.get('skip-cert-verify'): |
| parts.append(f"skip-cert-verify: true") |
| |
| output += "\n ".join(parts) + "\n" |
| |
| return output |
|
|
|
|
| |
| st.title("🔍 节点可用性测试工具") |
| logger.info("Streamlit 应用启动") |
|
|
| |
| if 'last_available_yaml' in st.session_state: |
| saved = st.session_state['last_available_yaml'] |
| with st.expander(f"📦 上次测试结果 ({saved['count']} 个节点 - {saved['time']})", expanded=False): |
| st.info(f"📁 上次测试保存了 {saved['count']} 个可用节点") |
| col1, col2 = st.columns(2) |
| with col1: |
| st.download_button( |
| label="📥 下载 available.yaml", |
| data=saved['content'], |
| file_name="available.yaml", |
| mime="text/yaml" |
| ) |
| with col2: |
| st.caption(f"保存时间: {saved['time']}") |
|
|
| st.markdown("---") |
|
|
| |
| with st.sidebar: |
| st.header("⚙️ 配置") |
|
|
| test_url = st.text_input("测试网址", value="https://www.google.com") |
| max_concurrent = st.number_input("并发数量", min_value=1, value=50, step=1) |
| timeout = st.number_input("超时时间(秒)", min_value=1, value=5, step=1) |
|
|
| st.markdown("---") |
| st.markdown(f"**aioquic**: {'✅ 已安装' if AIOQUIC_AVAILABLE else '❌ 未安装'}") |
| if not AIOQUIC_AVAILABLE: |
| st.caption("运行 `pip install aioquic` 启用真实代理测试") |
|
|
| st.markdown("---") |
|
|
| |
| st.header("📁 上传配置文件") |
| uploaded_file = st.file_uploader( |
| "选择YAML文件", |
| type=['yaml', 'yml', 'conf'], |
| help="上传包含代理节点配置的YAML文件" |
| ) |
|
|
| |
| if uploaded_file is None: |
| st.info("👈 请在侧边栏上传YAML配置文件") |
| st.markdown(""" |
| ### 支持的节点格式 |
| |
| | 格式 | 支持状态 | |
| |------|----------| |
| | Clash proxies[] | ✅ 完全支持 | |
| | V2Ray JSON | ✅ 完全支持 | |
| | HTTP 代理 | ✅ 完全支持 | |
| | SOCKS5 | ✅ 完全支持 | |
| | Shadowsocks | ✅ TCP端口检测 | |
| | VMess/VLESS/Trojan | ✅ TCP端口检测 | |
| |
| ### 测试方式 |
| - HTTP/SOCKS5: 实际代理请求测试 |
| - SS/VMess/VLESS/Trojan: TCP端口连通性 |
| """) |
| else: |
| yaml_content = uploaded_file.getvalue().decode('utf-8') |
|
|
| col1, col2 = st.columns([2, 1]) |
|
|
| with col1: |
| st.subheader("📄 文件信息") |
| st.text(f"文件名: {uploaded_file.name}") |
| st.text(f"文件大小: {uploaded_file.size / 1024:.2f} KB") |
|
|
| with col2: |
| st.subheader("📊 节点统计") |
| try: |
| temp_tester = NodeTester(yaml_content, test_url, 1, 1) |
| nodes = temp_tester.parse_yaml() |
| st.metric("检测到节点数", len(nodes)) |
| except: |
| st.metric("检测到节点数", yaml_content.count('name:')) |
|
|
| st.markdown("---") |
|
|
| if st.button("🚀 开始测试", type="primary", use_container_width=True): |
| logger.info("用户点击了开始测试按钮") |
| |
| |
| st.subheader("⏳ 测试进度") |
| progress_bar = st.progress(0) |
| progress_text = st.empty() |
| |
| |
| logger.info("正在初始化 NodeTester...") |
| tester = NodeTester(yaml_content, test_url, max_concurrent, timeout) |
| logger.info("NodeTester 初始化完成") |
| |
| logger.info("开始直连测试...") |
| with st.expander("🌐 直连测试", expanded=True): |
| direct_ok, direct_msg = asyncio.run(tester.test_direct_connection()) |
| logger.info(f"直连测试结果: {direct_ok} - {direct_msg}") |
| if direct_ok: |
| st.success(direct_msg) |
| else: |
| st.warning(direct_msg) |
| st.info("直连失败可能是因为目标网址被墙,但这不影响通过代理测试") |
|
|
| progress_text.markdown("⏳ 准备中...") |
| logger.info("开始异步测试...") |
| |
| |
| if 'test_state' not in st.session_state: |
| st.session_state.test_state = {'total': 0, 'available': 0, 'complete': False, 'results': [], 'tester': None} |
| test_state = st.session_state.test_state |
| |
| |
| tester = test_state.get('tester') |
| if tester is None: |
| tester = NodeTester(yaml_content, test_url, max_concurrent, timeout) |
| test_state['tester'] = tester |
| |
| |
| def run_test(): |
| logger.info("run_test 线程启动") |
| loop = asyncio.new_event_loop() |
| asyncio.set_event_loop(loop) |
| logger.info("准备执行 test_all_nodes_async") |
| total, available = loop.run_until_complete(tester.test_all_nodes_async()) |
| logger.info(f"test_all_nodes_async 执行完成: total={total}, available={available}") |
| test_state['total'] = total |
| test_state['available'] = available |
| test_state['results'] = tester.results.copy() |
| test_state['complete'] = True |
| loop.close() |
| logger.info("run_test 线程结束") |
| |
| logger.info("启动测试线程...") |
| test_thread = threading.Thread(target=run_test) |
| test_thread.start() |
| |
| |
| logger.info("进入轮询循环...") |
| poll_interval = 1.0 |
| poll_count = 0 |
| while test_thread.is_alive(): |
| current = tester._progress |
| if tester._total > 0: |
| pct = current / tester._total |
| progress_bar.progress(min(pct, 1.0)) |
| progress_text.markdown(f"🔄 测试中... {current}/{tester._total} 节点 ({pct*100:.1f}%)") |
| else: |
| progress_text.markdown(f"🔄 解析配置中... ({poll_count}s)") |
| |
| logger.info(f"轮询 #{poll_count}: progress={current}, total={tester._total}") |
| |
| poll_count += 1 |
| time.sleep(poll_interval) |
| |
| |
| test_thread.join() |
| |
| |
| available_yaml = tester.get_available_yaml() |
| |
| total = test_state['total'] |
| available = test_state['available'] |
| results_so_far = test_state['results'] |
| |
| |
| progress_bar.progress(1.0) |
| progress_text.markdown(f"✅ 测试完成! {available}/{total} 可用") |
| |
| st.markdown("---") |
| st.subheader("📋 测试结果") |
|
|
| col1, col2, col3 = st.columns(3) |
| with col1: |
| st.metric("总节点数", total) |
| with col2: |
| st.metric("可用节点", available, delta=f"{available/total*100:.1f}%" if total > 0 else "0%") |
| with col3: |
| st.metric("失败节点", total - available) |
|
|
| |
| log_lines = [] |
| success_count = 0 |
| fail_count = 0 |
| for success, message in results_so_far: |
| status = "✅ 可用" if success else "❌ 不可用" |
| log_lines.append(f"{status} | {message}") |
| if success: |
| success_count += 1 |
| else: |
| fail_count += 1 |
| |
| log_content = f"""节点测试报告 |
| 生成时间: {datetime.now().strftime('%Y-%m-%d %H:%M:%S')} |
| ======================================== |
| 总节点数: {total} |
| 可用节点: {success_count} |
| 不可用节点: {fail_count} |
| 通过率: {success_count/total*100:.1f}% (测试成功) |
| ======================================== |
| |
| 详细日志: |
| ---------------------------------------- |
| """ |
| log_content += "\n".join(log_lines) |
| |
| st.download_button( |
| label="📥 下载完整日志", |
| data=log_content, |
| file_name=f"test_log_{datetime.now().strftime('%Y%m%d_%H%M%S')}.txt", |
| mime="text/plain", |
| use_container_width=True |
| ) |
|
|
| if available > 0: |
| st.markdown("---") |
| st.subheader("💾 下载可用节点") |
|
|
| |
| if 'last_available_yaml' not in st.session_state: |
| st.session_state['last_available_yaml'] = {} |
| st.session_state['last_available_yaml']['content'] = available_yaml |
| st.session_state['last_available_yaml']['count'] = available |
| st.session_state['last_available_yaml']['time'] = datetime.now().strftime('%Y-%m-%d %H:%M:%S') |
|
|
| |
| st.download_button( |
| label=f"📥 下载可用节点 ({available} 个) - 固定文件名可用.yaml", |
| data=available_yaml, |
| file_name="available.yaml", |
| mime="text/yaml", |
| use_container_width=True |
| ) |
|
|
| |
| timestamp = datetime.now().strftime("%Y%m%d_%H%M%S") |
| st.download_button( |
| label=f"📥 下载备份 ({timestamp})", |
| data=available_yaml, |
| file_name=f"available_{timestamp}.yaml", |
| mime="text/yaml", |
| use_container_width=True |
| ) |
| |
| |
| preview_size = min(500, len(available_yaml)) |
| st.caption(f"📄 预览 (前 {preview_size} 字符)") |
| st.code(available_yaml[:500], language="yaml") |
| |
| |
| with st.expander("📋 代理链配置 (Clash Meta)"): |
| proxy_chain = "proxies:\n" |
| for node in tester.available_nodes[:10]: |
| name = node.get('name', 'Unknown') |
| proxy_chain += f" - name: \"{name}\"\n" |
| proxy_chain += f" type: {node.get('type', 'http')}\n" |
| proxy_chain += f" server: {node.get('server', '')}\n" |
| proxy_chain += f" port: {node.get('port', 0)}\n" |
| st.code(proxy_chain, language="yaml") |
| else: |
| st.warning("❌ 没有可用的节点") |
|
|
| st.markdown("---") |
| st.markdown( |
| """ |
| <div style='text-align: center; color: gray;'> |
| <p>🚀 支持 Shadowsocks / VMess / VLESS / Trojan TCP端口检测</p> |
| </div> |
| """, |
| unsafe_allow_html=True |
| ) |