# ''' # Description: 监听 todolist 表变化并运行相关脚本 # Author: Manda & Trae AI # Version: 1.1 # Date: 2024-07-26 # ''' import time import subprocess import logging from datetime import datetime import os import sys import io import mysql.connector # 导入 mysql.connector from dataBaseConnecter import DatabaseConnector # 导入 DatabaseConnector 以获取配置和连接 sys.stdout = io.TextIOWrapper(sys.stdout.buffer, encoding='utf-8') # 配置日志 logging.basicConfig( level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s', handlers=[ logging.FileHandler('notify_main.log', encoding='utf-8'), logging.StreamHandler() ] ) # --- 新增数据库连接和监听逻辑 --- def get_db_conn(): """获取数据库连接""" try: # 使用 DatabaseConnector 来获取连接,它会处理配置加载和SSL connector = DatabaseConnector() # 创建实例以加载配置 conn = connector.connect_db() # 使用其连接方法 if conn: logging.info("✅ 数据库连接成功") return conn else: logging.error("❌ 数据库连接失败 (来自 connect_db)") return None except Exception as e: logging.error(f"❌ 获取数据库连接时发生异常: {e}", exc_info=True) return None def get_latest_update_time(conn): """获取 todolist 表的最新更新时间 (假设有更新时间戳字段) 注意:需要确认 todolist 表是否有合适的更新时间字段。 这里暂时假设存在一个名为 'updated_at' 的字段。 如果不存在,需要修改为查询 MAX(id) 或其他能代表最新记录的方式。 """ with conn.cursor() as cursor: try: # !! 重要:请根据实际 todolist 表结构修改这里的查询 !! # 假设存在 updated_at 字段 # cursor.execute("SELECT MAX(updated_at) FROM todolist") # 根据用户提供的表结构,使用 last_modified 字段 cursor.execute("SELECT MAX(last_modified) FROM todolist") result = cursor.fetchone() # 如果表为空,result[0] 可能是 None return result[0] if result else None except mysql.connector.Error as err: logging.error(f"查询最新更新时间失败: {err}") # 可以考虑返回一个特殊值或重新抛出异常,取决于错误处理策略 return None # 或者 raise err # --- 结束新增数据库连接和监听逻辑 --- def run_script(script_name: str): """运行指定的Python脚本""" try: logging.info(f"开始运行脚本: {script_name}") # 捕获原始字节输出,不立即进行文本解码 result = subprocess.run(['python', script_name], capture_output=True, check=False) # 使用 check=False 避免在非零退出码时抛出异常 # 手动解码 stdout 和 stderr,并处理解码错误 stdout_decoded = result.stdout.decode('utf-8', errors='ignore') stderr_decoded = result.stderr.decode('utf-8', errors='ignore') if result.returncode == 0: logging.info(f"脚本 {script_name} 运行成功") if stdout_decoded: logging.info(f"输出: {stdout_decoded}") else: logging.error(f"脚本 {script_name} 运行失败 (返回码: {result.returncode})") if stderr_decoded: logging.error(f"错误: {stderr_decoded}") # 即使失败,也记录 stdout 以便调试 if stdout_decoded: logging.info(f"失败时的输出: {stdout_decoded}") except Exception as e: logging.error(f"运行脚本 {script_name} 时发生错误: {str(e)}") def run_all_scripts(): """运行所有脚本""" logging.info("检测到更新,开始执行所有脚本") # 获取当前脚本所在的目录 current_dir = os.path.dirname(os.path.abspath(__file__)) # 确保脚本顺序正确,如果它们之间有依赖关系 scripts = ['db2txt.py', 'usrSpareTime.py', 'compareDb2txt.py'] for script in scripts: # 使用完整的文件路径 script_path = os.path.join(current_dir, script) if os.path.exists(script_path): run_script(script_path) # 根据脚本执行时间调整等待时间,如果需要的话 # time.sleep(1) # 短暂等待,避免文件系统或进程间竞争 else: logging.error(f"脚本文件不存在: {script_path}") logging.info("所有脚本执行完成") # --- 修改 main 函数为监听模式 --- def monitor_todolist_and_run(interval=5): """监听 todolist 表并联动执行脚本""" conn = None last_update = None # 初始连接和获取状态 while conn is None: conn = get_db_conn() if conn: last_update = get_latest_update_time(conn) logging.info(f"初始 todolist 表状态 (last_modified): {last_update}") break else: logging.warning("数据库连接失败,5秒后重试...") time.sleep(5) while True: time.sleep(interval) try: # 检查连接是否仍然有效,如果无效则尝试重连 if not conn or not conn.is_connected(): logging.warning("数据库连接丢失,尝试重新连接...") conn = get_db_conn() if not conn: logging.error("重新连接失败,跳过本次检查。") continue # 跳过这次循环 else: # 重连成功后,重新获取一次最新状态,避免因断连期间的更新而误判 last_update = get_latest_update_time(conn) logging.info(f"重新连接成功,当前 todolist 表状态 (last_modified): {last_update}") current_update = get_latest_update_time(conn) # 检查 current_update 是否为 None (查询失败或表为空) if current_update is None and last_update is None: logging.debug("数据库查询失败或表仍为空,继续监听...") elif current_update != last_update: logging.info(f"检测到 todolist 表有更新 (last_modified 从 {last_update} 变为 {current_update}), 开始联动执行...") run_all_scripts() # 触发脚本执行 last_update = current_update # 更新状态 else: logging.debug(f"无更新 (last_modified: {current_update}),继续监听...") except Exception as e: logging.error(f"监听或执行过程中发生错误: {e}", exc_info=True) # 发生未知错误时,尝试关闭可能存在的旧连接,并在下次循环重连 if conn and conn.is_connected(): try: conn.close() logging.info("因异常关闭数据库连接") except Exception as close_err: logging.error(f"关闭连接时出错: {close_err}") conn = None # 强制下次循环重连 time.sleep(interval) # 避免错误循环过快 if __name__ == "__main__": logging.info("启动 todolist 监听与脚本执行程序") monitor_todolist_and_run(interval=5) # 设置监听间隔为5秒 # --- 移除旧的 schedule 相关代码 --- # def main(): # logging.info("启动定时任务程序") # # # 设置每小时运行一次 # schedule.every().hour.at(":43").do(run_all_scripts) # # # 立即运行一次 # run_all_scripts() # # # 持续运行 # while True: # try: # schedule.run_pending() # time.sleep(60) # except Exception as e: # logging.error(f"运行时发生错误: {str(e)}") # time.sleep(60) # 发生错误时等待一分钟后继续 # # if __name__ == "__main__": # main()