Spaces:
Sleeping
Sleeping
File size: 8,307 Bytes
84ed1d1 |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 |
# '''
# Description: 监听 todolist 表变化并运行相关脚本
# Author: Manda & Trae AI
# Version: 1.1
# Date: 2024-07-26
# '''
import time
import subprocess
import logging
from datetime import datetime
import os
import sys
import io
import mysql.connector # 导入 mysql.connector
from dataBaseConnecter import DatabaseConnector # 导入 DatabaseConnector 以获取配置和连接
sys.stdout = io.TextIOWrapper(sys.stdout.buffer, encoding='utf-8')
# 配置日志
logging.basicConfig(
level=logging.INFO,
format='%(asctime)s - %(levelname)s - %(message)s',
handlers=[
logging.FileHandler('notify_main.log', encoding='utf-8'),
logging.StreamHandler()
]
)
# --- 新增数据库连接和监听逻辑 ---
def get_db_conn():
"""获取数据库连接"""
try:
# 使用 DatabaseConnector 来获取连接,它会处理配置加载和SSL
connector = DatabaseConnector() # 创建实例以加载配置
conn = connector.connect_db() # 使用其连接方法
if conn:
logging.info("✅ 数据库连接成功")
return conn
else:
logging.error("❌ 数据库连接失败 (来自 connect_db)")
return None
except Exception as e:
logging.error(f"❌ 获取数据库连接时发生异常: {e}", exc_info=True)
return None
def get_latest_update_time(conn):
"""获取 todolist 表的最新更新时间 (假设有更新时间戳字段)
注意:需要确认 todolist 表是否有合适的更新时间字段。
这里暂时假设存在一个名为 'updated_at' 的字段。
如果不存在,需要修改为查询 MAX(id) 或其他能代表最新记录的方式。
"""
with conn.cursor() as cursor:
try:
# !! 重要:请根据实际 todolist 表结构修改这里的查询 !!
# 假设存在 updated_at 字段
# cursor.execute("SELECT MAX(updated_at) FROM todolist")
# 根据用户提供的表结构,使用 last_modified 字段
cursor.execute("SELECT MAX(last_modified) FROM todolist")
result = cursor.fetchone()
# 如果表为空,result[0] 可能是 None
return result[0] if result else None
except mysql.connector.Error as err:
logging.error(f"查询最新更新时间失败: {err}")
# 可以考虑返回一个特殊值或重新抛出异常,取决于错误处理策略
return None # 或者 raise err
# --- 结束新增数据库连接和监听逻辑 ---
def run_script(script_name: str):
"""运行指定的Python脚本"""
try:
logging.info(f"开始运行脚本: {script_name}")
# 捕获原始字节输出,不立即进行文本解码
result = subprocess.run(['python', script_name],
capture_output=True,
check=False) # 使用 check=False 避免在非零退出码时抛出异常
# 手动解码 stdout 和 stderr,并处理解码错误
stdout_decoded = result.stdout.decode('utf-8', errors='ignore')
stderr_decoded = result.stderr.decode('utf-8', errors='ignore')
if result.returncode == 0:
logging.info(f"脚本 {script_name} 运行成功")
if stdout_decoded:
logging.info(f"输出: {stdout_decoded}")
else:
logging.error(f"脚本 {script_name} 运行失败 (返回码: {result.returncode})")
if stderr_decoded:
logging.error(f"错误: {stderr_decoded}")
# 即使失败,也记录 stdout 以便调试
if stdout_decoded:
logging.info(f"失败时的输出: {stdout_decoded}")
except Exception as e:
logging.error(f"运行脚本 {script_name} 时发生错误: {str(e)}")
def run_all_scripts():
"""运行所有脚本"""
logging.info("检测到更新,开始执行所有脚本")
# 获取当前脚本所在的目录
current_dir = os.path.dirname(os.path.abspath(__file__))
# 确保脚本顺序正确,如果它们之间有依赖关系
scripts = ['db2txt.py', 'usrSpareTime.py', 'compareDb2txt.py']
for script in scripts:
# 使用完整的文件路径
script_path = os.path.join(current_dir, script)
if os.path.exists(script_path):
run_script(script_path)
# 根据脚本执行时间调整等待时间,如果需要的话
# time.sleep(1) # 短暂等待,避免文件系统或进程间竞争
else:
logging.error(f"脚本文件不存在: {script_path}")
logging.info("所有脚本执行完成")
# --- 修改 main 函数为监听模式 ---
def monitor_todolist_and_run(interval=5):
"""监听 todolist 表并联动执行脚本"""
conn = None
last_update = None
# 初始连接和获取状态
while conn is None:
conn = get_db_conn()
if conn:
last_update = get_latest_update_time(conn)
logging.info(f"初始 todolist 表状态 (last_modified): {last_update}")
break
else:
logging.warning("数据库连接失败,5秒后重试...")
time.sleep(5)
while True:
time.sleep(interval)
try:
# 检查连接是否仍然有效,如果无效则尝试重连
if not conn or not conn.is_connected():
logging.warning("数据库连接丢失,尝试重新连接...")
conn = get_db_conn()
if not conn:
logging.error("重新连接失败,跳过本次检查。")
continue # 跳过这次循环
else:
# 重连成功后,重新获取一次最新状态,避免因断连期间的更新而误判
last_update = get_latest_update_time(conn)
logging.info(f"重新连接成功,当前 todolist 表状态 (last_modified): {last_update}")
current_update = get_latest_update_time(conn)
# 检查 current_update 是否为 None (查询失败或表为空)
if current_update is None and last_update is None:
logging.debug("数据库查询失败或表仍为空,继续监听...")
elif current_update != last_update:
logging.info(f"检测到 todolist 表有更新 (last_modified 从 {last_update} 变为 {current_update}), 开始联动执行...")
run_all_scripts() # 触发脚本执行
last_update = current_update # 更新状态
else:
logging.debug(f"无更新 (last_modified: {current_update}),继续监听...")
except Exception as e:
logging.error(f"监听或执行过程中发生错误: {e}", exc_info=True)
# 发生未知错误时,尝试关闭可能存在的旧连接,并在下次循环重连
if conn and conn.is_connected():
try:
conn.close()
logging.info("因异常关闭数据库连接")
except Exception as close_err:
logging.error(f"关闭连接时出错: {close_err}")
conn = None # 强制下次循环重连
time.sleep(interval) # 避免错误循环过快
if __name__ == "__main__":
logging.info("启动 todolist 监听与脚本执行程序")
monitor_todolist_and_run(interval=5) # 设置监听间隔为5秒
# --- 移除旧的 schedule 相关代码 ---
# def main():
# logging.info("启动定时任务程序")
#
# # 设置每小时运行一次
# schedule.every().hour.at(":43").do(run_all_scripts)
#
# # 立即运行一次
# run_all_scripts()
#
# # 持续运行
# while True:
# try:
# schedule.run_pending()
# time.sleep(60)
# except Exception as e:
# logging.error(f"运行时发生错误: {str(e)}")
# time.sleep(60) # 发生错误时等待一分钟后继续
#
# if __name__ == "__main__":
# main() |