Spaces:
Sleeping
Sleeping
| # ''' | |
| # Description: 监听 todolist 表变化并运行相关脚本 | |
| # Author: Manda & Trae AI | |
| # Version: 1.1 | |
| # Date: 2024-07-26 | |
| # ''' | |
| import time | |
| import subprocess | |
| import logging | |
| from datetime import datetime | |
| import os | |
| import sys | |
| import io | |
| import mysql.connector # 导入 mysql.connector | |
| from dataBaseConnecter import DatabaseConnector # 导入 DatabaseConnector 以获取配置和连接 | |
| sys.stdout = io.TextIOWrapper(sys.stdout.buffer, encoding='utf-8') | |
| # 配置日志 | |
| logging.basicConfig( | |
| level=logging.INFO, | |
| format='%(asctime)s - %(levelname)s - %(message)s', | |
| handlers=[ | |
| logging.FileHandler('notify_main.log', encoding='utf-8'), | |
| logging.StreamHandler() | |
| ] | |
| ) | |
| # --- 新增数据库连接和监听逻辑 --- | |
| def get_db_conn(): | |
| """获取数据库连接""" | |
| try: | |
| # 使用 DatabaseConnector 来获取连接,它会处理配置加载和SSL | |
| connector = DatabaseConnector() # 创建实例以加载配置 | |
| conn = connector.connect_db() # 使用其连接方法 | |
| if conn: | |
| logging.info("✅ 数据库连接成功") | |
| return conn | |
| else: | |
| logging.error("❌ 数据库连接失败 (来自 connect_db)") | |
| return None | |
| except Exception as e: | |
| logging.error(f"❌ 获取数据库连接时发生异常: {e}", exc_info=True) | |
| return None | |
| def get_latest_update_time(conn): | |
| """获取 todolist 表的最新更新时间 (假设有更新时间戳字段) | |
| 注意:需要确认 todolist 表是否有合适的更新时间字段。 | |
| 这里暂时假设存在一个名为 'updated_at' 的字段。 | |
| 如果不存在,需要修改为查询 MAX(id) 或其他能代表最新记录的方式。 | |
| """ | |
| with conn.cursor() as cursor: | |
| try: | |
| # !! 重要:请根据实际 todolist 表结构修改这里的查询 !! | |
| # 假设存在 updated_at 字段 | |
| # cursor.execute("SELECT MAX(updated_at) FROM todolist") | |
| # 根据用户提供的表结构,使用 last_modified 字段 | |
| cursor.execute("SELECT MAX(last_modified) FROM todolist") | |
| result = cursor.fetchone() | |
| # 如果表为空,result[0] 可能是 None | |
| return result[0] if result else None | |
| except mysql.connector.Error as err: | |
| logging.error(f"查询最新更新时间失败: {err}") | |
| # 可以考虑返回一个特殊值或重新抛出异常,取决于错误处理策略 | |
| return None # 或者 raise err | |
| # --- 结束新增数据库连接和监听逻辑 --- | |
| def run_script(script_name: str): | |
| """运行指定的Python脚本""" | |
| try: | |
| logging.info(f"开始运行脚本: {script_name}") | |
| # 捕获原始字节输出,不立即进行文本解码 | |
| result = subprocess.run(['python', script_name], | |
| capture_output=True, | |
| check=False) # 使用 check=False 避免在非零退出码时抛出异常 | |
| # 手动解码 stdout 和 stderr,并处理解码错误 | |
| stdout_decoded = result.stdout.decode('utf-8', errors='ignore') | |
| stderr_decoded = result.stderr.decode('utf-8', errors='ignore') | |
| if result.returncode == 0: | |
| logging.info(f"脚本 {script_name} 运行成功") | |
| if stdout_decoded: | |
| logging.info(f"输出: {stdout_decoded}") | |
| else: | |
| logging.error(f"脚本 {script_name} 运行失败 (返回码: {result.returncode})") | |
| if stderr_decoded: | |
| logging.error(f"错误: {stderr_decoded}") | |
| # 即使失败,也记录 stdout 以便调试 | |
| if stdout_decoded: | |
| logging.info(f"失败时的输出: {stdout_decoded}") | |
| except Exception as e: | |
| logging.error(f"运行脚本 {script_name} 时发生错误: {str(e)}") | |
| def run_all_scripts(): | |
| """运行所有脚本""" | |
| logging.info("检测到更新,开始执行所有脚本") | |
| # 获取当前脚本所在的目录 | |
| current_dir = os.path.dirname(os.path.abspath(__file__)) | |
| # 确保脚本顺序正确,如果它们之间有依赖关系 | |
| scripts = ['db2txt.py', 'usrSpareTime.py', 'compareDb2txt.py'] | |
| for script in scripts: | |
| # 使用完整的文件路径 | |
| script_path = os.path.join(current_dir, script) | |
| if os.path.exists(script_path): | |
| run_script(script_path) | |
| # 根据脚本执行时间调整等待时间,如果需要的话 | |
| # time.sleep(1) # 短暂等待,避免文件系统或进程间竞争 | |
| else: | |
| logging.error(f"脚本文件不存在: {script_path}") | |
| logging.info("所有脚本执行完成") | |
| # --- 修改 main 函数为监听模式 --- | |
| def monitor_todolist_and_run(interval=5): | |
| """监听 todolist 表并联动执行脚本""" | |
| conn = None | |
| last_update = None | |
| # 初始连接和获取状态 | |
| while conn is None: | |
| conn = get_db_conn() | |
| if conn: | |
| last_update = get_latest_update_time(conn) | |
| logging.info(f"初始 todolist 表状态 (last_modified): {last_update}") | |
| break | |
| else: | |
| logging.warning("数据库连接失败,5秒后重试...") | |
| time.sleep(5) | |
| while True: | |
| time.sleep(interval) | |
| try: | |
| # 检查连接是否仍然有效,如果无效则尝试重连 | |
| if not conn or not conn.is_connected(): | |
| logging.warning("数据库连接丢失,尝试重新连接...") | |
| conn = get_db_conn() | |
| if not conn: | |
| logging.error("重新连接失败,跳过本次检查。") | |
| continue # 跳过这次循环 | |
| else: | |
| # 重连成功后,重新获取一次最新状态,避免因断连期间的更新而误判 | |
| last_update = get_latest_update_time(conn) | |
| logging.info(f"重新连接成功,当前 todolist 表状态 (last_modified): {last_update}") | |
| current_update = get_latest_update_time(conn) | |
| # 检查 current_update 是否为 None (查询失败或表为空) | |
| if current_update is None and last_update is None: | |
| logging.debug("数据库查询失败或表仍为空,继续监听...") | |
| elif current_update != last_update: | |
| logging.info(f"检测到 todolist 表有更新 (last_modified 从 {last_update} 变为 {current_update}), 开始联动执行...") | |
| run_all_scripts() # 触发脚本执行 | |
| last_update = current_update # 更新状态 | |
| else: | |
| logging.debug(f"无更新 (last_modified: {current_update}),继续监听...") | |
| except Exception as e: | |
| logging.error(f"监听或执行过程中发生错误: {e}", exc_info=True) | |
| # 发生未知错误时,尝试关闭可能存在的旧连接,并在下次循环重连 | |
| if conn and conn.is_connected(): | |
| try: | |
| conn.close() | |
| logging.info("因异常关闭数据库连接") | |
| except Exception as close_err: | |
| logging.error(f"关闭连接时出错: {close_err}") | |
| conn = None # 强制下次循环重连 | |
| time.sleep(interval) # 避免错误循环过快 | |
| if __name__ == "__main__": | |
| logging.info("启动 todolist 监听与脚本执行程序") | |
| monitor_todolist_and_run(interval=5) # 设置监听间隔为5秒 | |
| # --- 移除旧的 schedule 相关代码 --- | |
| # def main(): | |
| # logging.info("启动定时任务程序") | |
| # | |
| # # 设置每小时运行一次 | |
| # schedule.every().hour.at(":43").do(run_all_scripts) | |
| # | |
| # # 立即运行一次 | |
| # run_all_scripts() | |
| # | |
| # # 持续运行 | |
| # while True: | |
| # try: | |
| # schedule.run_pending() | |
| # time.sleep(60) | |
| # except Exception as e: | |
| # logging.error(f"运行时发生错误: {str(e)}") | |
| # time.sleep(60) # 发生错误时等待一分钟后继续 | |
| # | |
| # if __name__ == "__main__": | |
| # main() |