| import os |
| import re |
| import ast |
| import yaml |
| import httpx |
| import base64 |
| import inspect |
| from dotenv import load_dotenv |
| from fastapi import FastAPI, Response, HTTPException, Request |
|
|
| HEADERS = { |
| 'accept': 'application/json, text/plain, */*', |
| 'accept-language': 'zh-CN', |
| 'pragma': 'no-cache', |
| 'sec-ch-ua': '"Not?A_Brand";v="8", "Chromium";v="108"', |
| 'sec-ch-ua-mobile': '?0', |
| 'sec-ch-ua-platform': '"Windows"', |
| 'sec-fetch-dest': 'empty', |
| 'sec-fetch-mode': 'cors', |
| 'sec-fetch-site': 'cross-site', |
| 'user-agent': 'ClashforWindows/0.20.39', |
| } |
|
|
| app = FastAPI() |
|
|
| |
| load_dotenv() |
| API_KEY = os.environ.get('API_KEY') |
| SUBSCRIBE_URLS = os.environ.get('SUBSCRIBE_URLS') |
| INJECT_SCRIPT = os.environ.get('INJECT_SCRIPT') |
|
|
| def validate_function_signature(code_string): |
| """ 使用 ast 验证函数签名是否符合要求。 |
| """ |
| try: |
| module = ast.parse(code_string) |
| if not module.body: |
| print("错误:代码为空。") |
| return False |
| if len(module.body) != 1 or not isinstance(module.body[0], ast.FunctionDef): |
| print("错误:代码必须包含单个函数定义。") |
| return False |
| function_def = module.body[0] |
| |
| if function_def.name != "handle_mixin": |
| print(f"错误:函数名必须为 'handle_mixin',而不是 '{function_def.name}'。") |
| return False |
| |
| args = function_def.args |
| if args.defaults or args.kw_defaults or args.vararg or args.kwarg or args.posonlyargs: |
| print("错误:函数不能有默认参数、可变位置参数、可变关键字参数、或者仅位置参数") |
| return False |
| if len(args.args) != 1: |
| print(f"错误:函数必须恰好接受一个参数,而不是 {len(args.args)} 个。") |
| return False |
| |
| if args.args[0].arg != "content": |
| print(f"错误:参数名称必须为 'content',而不是 '{args.args[0].arg}'。") |
| return False |
| return True |
| except SyntaxError as e: |
| print(f"语法错误:{e}") |
| return False |
| except Exception as e: |
| print(f"发生错误:{e}") |
| return False |
|
|
| def load_mixin_function(code_string) -> callable: |
| if not validate_function_signature(code_string): |
| return |
| try: |
| code_obj = compile(code_string, '<string>', 'exec') |
| local_namespace = {} |
| exec(code_obj, local_namespace) |
| handle_mixin = local_namespace['handle_mixin'] |
| return handle_mixin |
| except Exception as e: |
| print(f"解析函数时出错:{e}") |
|
|
| try: |
| handle_mixin = load_mixin_function(INJECT_SCRIPT) |
| except Exception as e: |
| handle_mixin = lambda x:x |
| print(f"无法加载 MINIX 函数, 错误信息: {e}") |
|
|
| def subscribe_mixin(content: str) -> str: |
| """输入 YAML 字符串,输出转换后的 YAML 字符串 |
| """ |
| try: |
| d = yaml.safe_load(content) |
|
|
| my_auto_group_name = "AI Unrestrict" |
| regex_list = [ |
| re.compile(r"美国", re.IGNORECASE), |
| re.compile(r"america", re.IGNORECASE), |
| re.compile(r"us", re.IGNORECASE), |
| re.compile(r"新加坡", re.IGNORECASE), |
| re.compile(r"singapore", re.IGNORECASE), |
| re.compile(r"sg", re.IGNORECASE), |
| re.compile(r"加拿大", re.IGNORECASE), |
| re.compile(r"canada", re.IGNORECASE), |
| re.compile(r"ca", re.IGNORECASE), |
| ] |
| matching_proxies = [] |
|
|
| |
| if "proxies" in d and isinstance(d["proxies"], list): |
| for proxy in d["proxies"]: |
| if "name" in proxy: |
| for regex in regex_list: |
| if regex.search(proxy["name"]): |
| matching_proxies.append(proxy["name"]) |
| break |
|
|
| |
| new_proxy_group = { |
| "name": my_auto_group_name, |
| "type": "url-test", |
| "proxies": matching_proxies, |
| "url": "http://www.gstatic.com/generate_204", |
| "interval": 7200 |
| } |
|
|
| |
| if "proxy-groups" in d and isinstance(d["proxy-groups"], list): |
| d["proxy-groups"].append(new_proxy_group) |
|
|
| |
| if d["proxy-groups"] and len(d["proxy-groups"]) > 0 and \ |
| "proxies" in d["proxy-groups"][0] and isinstance(d["proxy-groups"][0]["proxies"], list): |
| d["proxy-groups"][0]["proxies"].insert(0, my_auto_group_name) |
| else: |
| d["proxy-groups"] = [new_proxy_group] |
|
|
| d.pop('socks-port', None) |
|
|
| |
| try: |
| d = handle_mixin(d) |
| except Exception as e: |
| print(f"执行 Minix 函数时出错!") |
|
|
| modified_yaml = yaml.dump(d, allow_unicode=True, indent=2) |
|
|
| return modified_yaml |
|
|
| except yaml.YAMLError as e: |
| print(f"YAML 解析错误:{e}") |
| return "" |
| except Exception as e: |
| print(f"其他错误:{e}") |
| return "" |
|
|
| @app.get("/getsub") |
| async def read_subscribe(request: Request, key: str): |
| |
| if key != API_KEY: |
| raise HTTPException(status_code=401, detail="Unauthorized") |
|
|
| |
| if not SUBSCRIBE_URLS: |
| raise HTTPException(status_code=500, detail="SUBSCRIBE_URLS not configured") |
| |
| urls = SUBSCRIBE_URLS.split('\n') |
| proxy_urls = [ |
| f'{request.base_url}proxy?encoded_url={base64.b64encode(url.encode()).decode()}' |
| for url in urls |
| if url.strip() |
| ] |
| merged_urls: str = '|'.join(proxy_urls) |
| args = { |
| 'target': 'clash', |
| 'url': merged_urls |
| } |
| |
| async with httpx.AsyncClient() as client: |
| try: |
| resp = await client.get('http://127.0.0.1:25500/sub', params=args, headers=HEADERS) |
| resp.raise_for_status() |
| data = resp.text |
| data = subscribe_mixin(data) |
| return Response(content=data, media_type='text/yaml') |
| except httpx.RequestError as e: |
| raise HTTPException(status_code=500, detail=str(e)) |
| except Exception as e: |
| raise HTTPException(status_code=500, detail=str(e)) |
|
|
| @app.get("/proxy") |
| async def proxy_url(encoded_url: str): |
| ''' |
| 有一些订阅地址, 需要检测请求头. |
| 然而 tindy2013/subconverter 项目并不支持请求头配置. |
| 所以将请求重定向到本服务中, 然后通过自定义请求头绕过. |
| ''' |
| try: |
| try: |
| decoded_bytes = base64.b64decode(encoded_url) |
| target_url = decoded_bytes.decode('utf-8') |
| except Exception as e: |
| raise HTTPException(status_code=400, detail="Invalid base64 encoding") |
|
|
| async with httpx.AsyncClient() as client: |
| response = await client.get(target_url, headers=HEADERS) |
| |
| return Response( |
| content=response.content, |
| status_code=response.status_code, |
| headers=dict(response.headers), |
| media_type=response.headers.get("content-type") |
| ) |
| |
| except httpx.RequestError as e: |
| raise HTTPException(status_code=500, detail=f"Request failed: {str(e)}") |
| except Exception as e: |
| raise HTTPException(status_code=500, detail=f"Error: {str(e)}") |
|
|
| @app.get("/") |
| async def read_root(): |
| return {"hello": 'world'} |
|
|