Spaces:
Paused
Paused
| import os | |
| import re | |
| import ast | |
| import yaml | |
| import httpx | |
| import base64 | |
| import inspect | |
| from dotenv import load_dotenv | |
| from fastapi import FastAPI, Response, HTTPException, Request | |
| HEADERS = { | |
| 'accept': 'application/json, text/plain, */*', | |
| 'accept-language': 'zh-CN', | |
| 'pragma': 'no-cache', | |
| 'sec-ch-ua': '"Not?A_Brand";v="8", "Chromium";v="108"', | |
| 'sec-ch-ua-mobile': '?0', | |
| 'sec-ch-ua-platform': '"Windows"', | |
| 'sec-fetch-dest': 'empty', | |
| 'sec-fetch-mode': 'cors', | |
| 'sec-fetch-site': 'cross-site', | |
| 'user-agent': 'ClashforWindows/0.20.39', | |
| } | |
| app = FastAPI() | |
| # 从环境变量中获取密钥 | |
| load_dotenv() | |
| API_KEY = os.environ.get('API_KEY') | |
| SUBSCRIBE_URLS = os.environ.get('SUBSCRIBE_URLS') # 存储真实URL的变量 | |
| INJECT_SCRIPT = os.environ.get('INJECT_SCRIPT') | |
| def validate_function_signature(code_string): | |
| """ 使用 ast 验证函数签名是否符合要求。 | |
| """ | |
| try: | |
| module = ast.parse(code_string) | |
| if not module.body: | |
| print("错误:代码为空。") | |
| return False | |
| if len(module.body) != 1 or not isinstance(module.body[0], ast.FunctionDef): | |
| print("错误:代码必须包含单个函数定义。") | |
| return False | |
| function_def = module.body[0] | |
| # Step 01. 检查函数名 | |
| if function_def.name != "handle_mixin": | |
| print(f"错误:函数名必须为 'handle_mixin',而不是 '{function_def.name}'。") | |
| return False | |
| # Step 02. 检查参数个数 | |
| args = function_def.args | |
| if args.defaults or args.kw_defaults or args.vararg or args.kwarg or args.posonlyargs: | |
| print("错误:函数不能有默认参数、可变位置参数、可变关键字参数、或者仅位置参数") | |
| return False | |
| if len(args.args) != 1: | |
| print(f"错误:函数必须恰好接受一个参数,而不是 {len(args.args)} 个。") | |
| return False | |
| # Step 03. 检查参数名称 | |
| if args.args[0].arg != "content": | |
| print(f"错误:参数名称必须为 'content',而不是 '{args.args[0].arg}'。") | |
| return False | |
| return True | |
| except SyntaxError as e: | |
| print(f"语法错误:{e}") | |
| return False | |
| except Exception as e: | |
| print(f"发生错误:{e}") | |
| return False | |
| def load_mixin_function(code_string) -> callable: | |
| if not validate_function_signature(code_string): | |
| return | |
| try: | |
| code_obj = compile(code_string, '<string>', 'exec') | |
| local_namespace = {} | |
| exec(code_obj, local_namespace) | |
| handle_mixin = local_namespace['handle_mixin'] | |
| return handle_mixin | |
| except Exception as e: | |
| print(f"解析函数时出错:{e}") | |
| try: | |
| handle_mixin = load_mixin_function(INJECT_SCRIPT) | |
| except Exception as e: | |
| handle_mixin = lambda x:x | |
| print(f"无法加载 MINIX 函数, 错误信息: {e}") | |
| def subscribe_mixin(content: str) -> str: | |
| """输入 YAML 字符串,输出转换后的 YAML 字符串 | |
| """ | |
| try: | |
| d = yaml.safe_load(content) | |
| my_auto_group_name = "AI Unrestrict" | |
| regex_list = [ | |
| re.compile(r"美国", re.IGNORECASE), | |
| re.compile(r"america", re.IGNORECASE), | |
| re.compile(r"us", re.IGNORECASE), | |
| re.compile(r"新加坡", re.IGNORECASE), | |
| re.compile(r"singapore", re.IGNORECASE), | |
| re.compile(r"sg", re.IGNORECASE), | |
| re.compile(r"加拿大", re.IGNORECASE), | |
| re.compile(r"canada", re.IGNORECASE), | |
| re.compile(r"ca", re.IGNORECASE), | |
| ] | |
| matching_proxies = [] # 用于存储匹配的代理名称 | |
| # 1. 查找并保存符合正则表达式的 proxy name | |
| if "proxies" in d and isinstance(d["proxies"], list): | |
| for proxy in d["proxies"]: | |
| if "name" in proxy: | |
| for regex in regex_list: | |
| if regex.search(proxy["name"]): | |
| matching_proxies.append(proxy["name"]) | |
| break | |
| # 2. 创建新的 proxy-group 对象 | |
| new_proxy_group = { | |
| "name": my_auto_group_name, | |
| "type": "url-test", | |
| "proxies": matching_proxies, | |
| "url": "http://www.gstatic.com/generate_204", | |
| "interval": 7200 | |
| } | |
| # 3. 将新的 proxy-group 添加到 proxy-groups 数组 | |
| if "proxy-groups" in d and isinstance(d["proxy-groups"], list): | |
| d["proxy-groups"].append(new_proxy_group) | |
| # 4. 将 myAutoGroupName 添加到第一个 proxy-group 的 "proxies" 列表的最前面 | |
| if d["proxy-groups"] and len(d["proxy-groups"]) > 0 and \ | |
| "proxies" in d["proxy-groups"][0] and isinstance(d["proxy-groups"][0]["proxies"], list): | |
| d["proxy-groups"][0]["proxies"].insert(0, my_auto_group_name) | |
| else: | |
| d["proxy-groups"] = [new_proxy_group] | |
| d.pop('socks-port', None) | |
| # 在此处尝试调用 mixin 函数 | |
| try: | |
| d = handle_mixin(d) | |
| except Exception as e: | |
| print(f"执行 Minix 函数时出错!") | |
| modified_yaml = yaml.dump(d, allow_unicode=True, indent=2) | |
| return modified_yaml | |
| except yaml.YAMLError as e: | |
| print(f"YAML 解析错误:{e}") | |
| return "" | |
| except Exception as e: | |
| print(f"其他错误:{e}") | |
| return "" | |
| async def read_subscribe(request: Request, key: str): | |
| # 验证API Key | |
| if key != API_KEY: | |
| raise HTTPException(status_code=401, detail="Unauthorized") | |
| # 从环境变量获取URL列表 | |
| if not SUBSCRIBE_URLS: | |
| raise HTTPException(status_code=500, detail="SUBSCRIBE_URLS not configured") | |
| urls = SUBSCRIBE_URLS.split('\n') | |
| proxy_urls = [ | |
| f'{request.base_url}proxy?encoded_url={base64.b64encode(url.encode()).decode()}' | |
| for url in urls | |
| if url.strip() | |
| ] | |
| merged_urls: str = '|'.join(proxy_urls) | |
| args = { | |
| 'target': 'clash', | |
| 'url': merged_urls | |
| } | |
| async with httpx.AsyncClient() as client: | |
| try: | |
| resp = await client.get('http://127.0.0.1:25500/sub', params=args, headers=HEADERS) | |
| resp.raise_for_status() | |
| data = resp.text | |
| data = subscribe_mixin(data) | |
| return Response(content=data, media_type='text/yaml') | |
| except httpx.RequestError as e: | |
| raise HTTPException(status_code=500, detail=str(e)) | |
| except Exception as e: | |
| raise HTTPException(status_code=500, detail=str(e)) | |
| async def proxy_url(encoded_url: str): | |
| ''' | |
| 有一些订阅地址, 需要检测请求头. | |
| 然而 tindy2013/subconverter 项目并不支持请求头配置. | |
| 所以将请求重定向到本服务中, 然后通过自定义请求头绕过. | |
| ''' | |
| try: | |
| try: | |
| decoded_bytes = base64.b64decode(encoded_url) | |
| target_url = decoded_bytes.decode('utf-8') | |
| except Exception as e: | |
| raise HTTPException(status_code=400, detail="Invalid base64 encoding") | |
| async with httpx.AsyncClient() as client: | |
| response = await client.get(target_url, headers=HEADERS) | |
| return Response( | |
| content=response.content, | |
| status_code=response.status_code, | |
| headers=dict(response.headers), | |
| media_type=response.headers.get("content-type") | |
| ) | |
| except httpx.RequestError as e: | |
| raise HTTPException(status_code=500, detail=f"Request failed: {str(e)}") | |
| except Exception as e: | |
| raise HTTPException(status_code=500, detail=f"Error: {str(e)}") | |
| async def read_root(): | |
| return {"hello": 'world'} | |