| import re |
| import yaml |
| import aiohttp |
| import asyncio |
| import datetime |
| import sys |
| import traceback |
| from aiohttp import web |
| from urllib.parse import parse_qs |
| from cachetools import TTLCache |
|
|
| |
| cache = TTLCache(maxsize=1000, ttl=1800) |
|
|
| async def fetch_url(url, session): |
| async with session.get(url) as response: |
| return await response.text() |
|
|
| async def extract_and_transform_proxies(input_text): |
| try: |
| data = yaml.safe_load(input_text) |
| if isinstance(data, dict) and 'proxies' in data: |
| proxies_list = data['proxies'] |
| elif isinstance(data, list): |
| proxies_list = data |
| else: |
| proxies_match = re.search(r'proxies:\s*\n((?:[-\s]*{.*\n?)*)', input_text, re.MULTILINE) |
| if proxies_match: |
| proxies_text = proxies_match.group(1) |
| proxies_list = yaml.safe_load(proxies_text) |
| else: |
| return "未找到有效的代理配置" |
| except yaml.YAMLError: |
| return "YAML解析错误" |
|
|
| if not proxies_list: |
| return "未找到有效的代理配置" |
|
|
| transformed_proxies = [] |
|
|
| for proxy in proxies_list: |
| if proxy.get('type') == 'ss': |
| name = proxy.get('name', '').strip() |
| server = proxy.get('server', '').strip() |
| port = str(proxy.get('port', '')).strip() |
| |
| ss_parts = [f"{name} = ss, {server}, {port}"] |
| |
| if 'cipher' in proxy: |
| ss_parts.append(f"encrypt-method={proxy['cipher'].strip()}") |
| if 'password' in proxy: |
| ss_parts.append(f"password={proxy['password'].strip()}") |
| if 'udp' in proxy: |
| ss_parts.append(f"udp-relay={'true' if proxy['udp'] in [True, 'true', 'True'] else 'false'}") |
|
|
| transformed = ", ".join(ss_parts) |
| transformed_proxies.append(transformed) |
|
|
| elif proxy.get('type') == 'trojan': |
| name = proxy.get('name', '').strip() |
| server = proxy.get('server', '').strip() |
| port = str(proxy.get('port', '')).strip() |
| |
| trojan_parts = [f"{name} = trojan, {server}, {port}"] |
| |
| if 'password' in proxy: |
| trojan_parts.append(f"password={proxy['password'].strip()}") |
| if 'sni' in proxy: |
| trojan_parts.append(f"sni={proxy['sni'].strip()}") |
| if 'skip-cert-verify' in proxy: |
| trojan_parts.append(f"skip-cert-verify={str(proxy['skip-cert-verify']).lower()}") |
| if 'udp' in proxy: |
| trojan_parts.append(f"udp={'true' if proxy['udp'] in [True, 'true', 'True'] else 'false'}") |
|
|
| transformed = ", ".join(trojan_parts) |
| transformed_proxies.append(transformed) |
|
|
| return "\n".join(transformed_proxies) if transformed_proxies else "未找到有效的SS或Trojan代理配置" |
|
|
| async def log_request(request, response, cache_status): |
| timestamp = datetime.datetime.now().strftime('%Y-%m-%d %H:%M:%S') |
| client_ip = request.remote |
| request_line = f"{request.method} {request.path}" |
| if request.query_string: |
| request_line += f"?{request.query_string}" |
| status_code = response.status |
| content_length = response.content_length |
| user_agent = request.headers.get('User-Agent', 'Unknown') |
|
|
| log_message = ( |
| f"{timestamp} - {client_ip} - \"{request_line}\" {status_code} {content_length} - " |
| f"User-Agent: {user_agent} - Cache: {cache_status}" |
| ) |
| print(log_message, flush=True) |
|
|
| @web.middleware |
| async def logging_middleware(request, handler): |
| response, cache_status = await handler(request) |
| await log_request(request, response, cache_status) |
| return response |
|
|
| async def handle_request(request): |
| if request.path == '/': |
| query_params = parse_qs(request.query_string) |
| if 'url' in query_params: |
| url = query_params['url'][0] |
| force_refresh = 'nocache' in query_params |
| |
| if not force_refresh and url in cache: |
| return web.Response(text=cache[url], content_type='text/plain'), "Hit" |
| |
| try: |
| async with aiohttp.ClientSession() as session: |
| input_text = await fetch_url(url, session) |
| result = await extract_and_transform_proxies(input_text) |
| |
| |
| cache[url] = result |
| |
| return web.Response(text=result, content_type='text/plain'), "Miss" |
| except Exception as e: |
| return web.Response(text=f"Error: {str(e)}", status=500), "Error" |
| else: |
| usage_guide = """ |
| <html> |
| <body> |
| <h1>代理配置转换工具</h1> |
| <p>使用方法:在URL参数中提供包含代理配置的网址。</p> |
| <p>示例:<code>http://localhost:8080/?url=https://example.com/path-to-proxy-config</code></p> |
| <p>强制刷新缓存:<code>http://localhost:8080/?url=https://example.com/path-to-proxy-config&nocache</code></p> |
| </body> |
| </html> |
| """ |
| return web.Response(text=usage_guide, content_type='text/html'), "Guide" |
| else: |
| return web.Response(text="Not Found", status=404), "NotFound" |
|
|
| async def init_app(): |
| app = web.Application(middlewares=[logging_middleware]) |
| app.router.add_get('/', handle_request) |
| return app |
|
|
| if __name__ == "__main__": |
| print(f"===== Application Startup at {datetime.datetime.now().strftime('%Y-%m-%d %H:%M:%S')} =====") |
| print("Server running on port 8080") |
| web.run_app(init_app(), port=8080, print=lambda _: None) |
|
|