File size: 4,570 Bytes
cfa48c0 97caefd 145f072 039e519 f9008dd 1298473 145f072 97caefd 1298473 803d901 145f072 97caefd 039e519 97caefd 039e519 97caefd 039e519 145f072 97caefd 145f072 f9008dd 97caefd 039e519 cfa48c0 97caefd 039e519 97caefd 039e519 cfa48c0 039e519 97caefd 039e519 cfa48c0 039e519 97caefd cfa48c0 145f072 039e519 145f072 97caefd 039e519 145f072 039e519 97caefd 145f072 039e519 916ef49 145f072 039e519 145f072 039e519 97caefd 039e519 97caefd 039e519 97caefd 039e519 145f072 039e519 | 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 | import sys
import os
import subprocess
import importlib.util
import time
import threading
# --- 关键修正:依赖包检查与安装 ---
def check_and_install_packages(packages):
"""检查并安装所有必需的Python库。"""
print("===== 依赖检查 =====")
for package_name in packages:
if importlib.util.find_spec(package_name) is None:
print(f"⚠️ 未找到库: '{package_name}',正在尝试自动安装...")
try:
subprocess.check_call([sys.executable, "-m", "pip", "install", package_name])
print(f"✅ 成功安装 '{package_name}'")
except subprocess.CalledProcessError:
print(f"❌ 安装 '{package_name}' 失败。请手动运行 'pip install {package_name}' 后再试。")
sys.exit(1)
else:
print(f"✔️ 库 '{package_name}' 已安装。")
print("===== 依赖检查完成 =====\n")
# --- 配置 ---
REQUIRED_PACKAGES = ["requests","uvicorn[standard]"]
TXT_URL = os.environ.get('TXT_URL', "")
LOCAL_CODE_FILE = "api.py" # 根据您的日志更新了文件名
PORT = 7860 # 根据您的日志更新了端口号
CHECK_INTERVAL = 20
# --- 全局变量 ---
current_code_identifier = None
stop_event = threading.Event()
def update_local_code_file():
"""检查远程代码更新。如果有更新,则下载并覆盖本地文件。"""
global current_code_identifier
# 为了简洁,这里省略了函数的内部代码,它和上一版完全相同
print(f"[{time.strftime('%Y-%m-%d %H:%M:%S', time.localtime())}] 正在检查代码更新...")
try:
headers = {}
if current_code_identifier and current_code_identifier.startswith('"'):
headers['If-None-Match'] = current_code_identifier
response = requests.get(TXT_URL, headers=headers, timeout=10)
if response.status_code == 304:
print("代码无变化。")
return False
response.raise_for_status()
new_code_content = response.text
new_etag = response.headers.get('ETag')
new_identifier = new_etag if new_etag else hash(new_code_content)
if new_identifier != current_code_identifier:
print("检测到代码更新!正在更新本地文件...")
with open(LOCAL_CODE_FILE, "w", encoding="utf-8") as f:
f.write(new_code_content)
current_code_identifier = new_identifier
print(f"本地文件 '{LOCAL_CODE_FILE}' 已更新。Uvicorn 将自动重载。")
return True
else:
print("代码无变化。")
return False
except requests.exceptions.RequestException as e:
print(f"检查更新失败: {e}")
return False
def monitor_in_background():
"""在后台线程中运行的监视器循环。"""
while not stop_event.is_set():
update_local_code_file()
stop_event.wait(CHECK_INTERVAL)
if __name__ == "__main__":
# 0. 首先,检查并安装所有依赖!
check_and_install_packages(REQUIRED_PACKAGES)
# 动态导入 requests 模块,因为它现在才确认被安装
import requests
# 1. 首次运行:必须先下载一次代码
print("首次启动,正在下载初始代码...")
update_local_code_file()
if not os.path.exists(LOCAL_CODE_FILE):
print(f"初始代码下载失败或内容为空,无法启动服务器。请检查URL: {TXT_URL}")
sys.exit(1)
# 2. 启动后台线程来监控未来的更新
monitor_thread = threading.Thread(target=monitor_in_background)
monitor_thread.daemon = True
monitor_thread.start()
print("后台更新监视器已启动。")
# 3. 在主线程中启动 Uvicorn
module_name = LOCAL_CODE_FILE.replace('.py', '')
command = [
sys.executable, "-m", "uvicorn",
f"{module_name}:app",
"--host", "0.0.0.0",
"--port", str(PORT),
"--reload"
]
uvicorn_process = None
try:
print(f"正在启动 Uvicorn (reload模式)... 命令: {' '.join(command)}")
uvicorn_process = subprocess.Popen(command)
uvicorn_process.wait()
except KeyboardInterrupt:
print("\n检测到手动中断 (Ctrl+C)...")
except Exception as e:
print(f"\n启动Uvicorn时发生错误: {e}")
finally:
print("正在关闭后台监视器和Uvicorn...")
stop_event.set()
if uvicorn_process:
uvicorn_process.terminate()
print("程序已退出。") |