import sys import os import subprocess import importlib.util import time import threading # --- 关键修正:依赖包检查与安装 --- def check_and_install_packages(packages): """检查并安装所有必需的Python库。""" print("===== 依赖检查 =====") for package_name in packages: if importlib.util.find_spec(package_name) is None: print(f"⚠️ 未找到库: '{package_name}',正在尝试自动安装...") try: subprocess.check_call([sys.executable, "-m", "pip", "install", package_name]) print(f"✅ 成功安装 '{package_name}'") except subprocess.CalledProcessError: print(f"❌ 安装 '{package_name}' 失败。请手动运行 'pip install {package_name}' 后再试。") sys.exit(1) else: print(f"✔️ 库 '{package_name}' 已安装。") print("===== 依赖检查完成 =====\n") # --- 配置 --- REQUIRED_PACKAGES = ["requests","uvicorn[standard]"] TXT_URL = os.environ.get('TXT_URL', "") LOCAL_CODE_FILE = "api.py" # 根据您的日志更新了文件名 PORT = 7860 # 根据您的日志更新了端口号 CHECK_INTERVAL = 20 # --- 全局变量 --- current_code_identifier = None stop_event = threading.Event() def update_local_code_file(): """检查远程代码更新。如果有更新,则下载并覆盖本地文件。""" global current_code_identifier # 为了简洁,这里省略了函数的内部代码,它和上一版完全相同 print(f"[{time.strftime('%Y-%m-%d %H:%M:%S', time.localtime())}] 正在检查代码更新...") try: headers = {} if current_code_identifier and current_code_identifier.startswith('"'): headers['If-None-Match'] = current_code_identifier response = requests.get(TXT_URL, headers=headers, timeout=10) if response.status_code == 304: print("代码无变化。") return False response.raise_for_status() new_code_content = response.text new_etag = response.headers.get('ETag') new_identifier = new_etag if new_etag else hash(new_code_content) if new_identifier != current_code_identifier: print("检测到代码更新!正在更新本地文件...") with open(LOCAL_CODE_FILE, "w", encoding="utf-8") as f: f.write(new_code_content) current_code_identifier = new_identifier print(f"本地文件 '{LOCAL_CODE_FILE}' 已更新。Uvicorn 将自动重载。") return True else: print("代码无变化。") return False except requests.exceptions.RequestException as e: print(f"检查更新失败: {e}") return False def monitor_in_background(): """在后台线程中运行的监视器循环。""" while not stop_event.is_set(): update_local_code_file() stop_event.wait(CHECK_INTERVAL) if __name__ == "__main__": # 0. 首先,检查并安装所有依赖! check_and_install_packages(REQUIRED_PACKAGES) # 动态导入 requests 模块,因为它现在才确认被安装 import requests # 1. 首次运行:必须先下载一次代码 print("首次启动,正在下载初始代码...") update_local_code_file() if not os.path.exists(LOCAL_CODE_FILE): print(f"初始代码下载失败或内容为空,无法启动服务器。请检查URL: {TXT_URL}") sys.exit(1) # 2. 启动后台线程来监控未来的更新 monitor_thread = threading.Thread(target=monitor_in_background) monitor_thread.daemon = True monitor_thread.start() print("后台更新监视器已启动。") # 3. 在主线程中启动 Uvicorn module_name = LOCAL_CODE_FILE.replace('.py', '') command = [ sys.executable, "-m", "uvicorn", f"{module_name}:app", "--host", "0.0.0.0", "--port", str(PORT), "--reload" ] uvicorn_process = None try: print(f"正在启动 Uvicorn (reload模式)... 命令: {' '.join(command)}") uvicorn_process = subprocess.Popen(command) uvicorn_process.wait() except KeyboardInterrupt: print("\n检测到手动中断 (Ctrl+C)...") except Exception as e: print(f"\n启动Uvicorn时发生错误: {e}") finally: print("正在关闭后台监视器和Uvicorn...") stop_event.set() if uvicorn_process: uvicorn_process.terminate() print("程序已退出。")