ToDoAgent / Notify /notifyMain.py
Siyu Wang
updated to KK_Server
84ed1d1
# '''
# Description: 监听 todolist 表变化并运行相关脚本
# Author: Manda & Trae AI
# Version: 1.1
# Date: 2024-07-26
# '''
import time
import subprocess
import logging
from datetime import datetime
import os
import sys
import io
import mysql.connector # 导入 mysql.connector
from dataBaseConnecter import DatabaseConnector # 导入 DatabaseConnector 以获取配置和连接
sys.stdout = io.TextIOWrapper(sys.stdout.buffer, encoding='utf-8')
# 配置日志
logging.basicConfig(
level=logging.INFO,
format='%(asctime)s - %(levelname)s - %(message)s',
handlers=[
logging.FileHandler('notify_main.log', encoding='utf-8'),
logging.StreamHandler()
]
)
# --- 新增数据库连接和监听逻辑 ---
def get_db_conn():
"""获取数据库连接"""
try:
# 使用 DatabaseConnector 来获取连接,它会处理配置加载和SSL
connector = DatabaseConnector() # 创建实例以加载配置
conn = connector.connect_db() # 使用其连接方法
if conn:
logging.info("✅ 数据库连接成功")
return conn
else:
logging.error("❌ 数据库连接失败 (来自 connect_db)")
return None
except Exception as e:
logging.error(f"❌ 获取数据库连接时发生异常: {e}", exc_info=True)
return None
def get_latest_update_time(conn):
"""获取 todolist 表的最新更新时间 (假设有更新时间戳字段)
注意:需要确认 todolist 表是否有合适的更新时间字段。
这里暂时假设存在一个名为 'updated_at' 的字段。
如果不存在,需要修改为查询 MAX(id) 或其他能代表最新记录的方式。
"""
with conn.cursor() as cursor:
try:
# !! 重要:请根据实际 todolist 表结构修改这里的查询 !!
# 假设存在 updated_at 字段
# cursor.execute("SELECT MAX(updated_at) FROM todolist")
# 根据用户提供的表结构,使用 last_modified 字段
cursor.execute("SELECT MAX(last_modified) FROM todolist")
result = cursor.fetchone()
# 如果表为空,result[0] 可能是 None
return result[0] if result else None
except mysql.connector.Error as err:
logging.error(f"查询最新更新时间失败: {err}")
# 可以考虑返回一个特殊值或重新抛出异常,取决于错误处理策略
return None # 或者 raise err
# --- 结束新增数据库连接和监听逻辑 ---
def run_script(script_name: str):
"""运行指定的Python脚本"""
try:
logging.info(f"开始运行脚本: {script_name}")
# 捕获原始字节输出,不立即进行文本解码
result = subprocess.run(['python', script_name],
capture_output=True,
check=False) # 使用 check=False 避免在非零退出码时抛出异常
# 手动解码 stdout 和 stderr,并处理解码错误
stdout_decoded = result.stdout.decode('utf-8', errors='ignore')
stderr_decoded = result.stderr.decode('utf-8', errors='ignore')
if result.returncode == 0:
logging.info(f"脚本 {script_name} 运行成功")
if stdout_decoded:
logging.info(f"输出: {stdout_decoded}")
else:
logging.error(f"脚本 {script_name} 运行失败 (返回码: {result.returncode})")
if stderr_decoded:
logging.error(f"错误: {stderr_decoded}")
# 即使失败,也记录 stdout 以便调试
if stdout_decoded:
logging.info(f"失败时的输出: {stdout_decoded}")
except Exception as e:
logging.error(f"运行脚本 {script_name} 时发生错误: {str(e)}")
def run_all_scripts():
"""运行所有脚本"""
logging.info("检测到更新,开始执行所有脚本")
# 获取当前脚本所在的目录
current_dir = os.path.dirname(os.path.abspath(__file__))
# 确保脚本顺序正确,如果它们之间有依赖关系
scripts = ['db2txt.py', 'usrSpareTime.py', 'compareDb2txt.py']
for script in scripts:
# 使用完整的文件路径
script_path = os.path.join(current_dir, script)
if os.path.exists(script_path):
run_script(script_path)
# 根据脚本执行时间调整等待时间,如果需要的话
# time.sleep(1) # 短暂等待,避免文件系统或进程间竞争
else:
logging.error(f"脚本文件不存在: {script_path}")
logging.info("所有脚本执行完成")
# --- 修改 main 函数为监听模式 ---
def monitor_todolist_and_run(interval=5):
"""监听 todolist 表并联动执行脚本"""
conn = None
last_update = None
# 初始连接和获取状态
while conn is None:
conn = get_db_conn()
if conn:
last_update = get_latest_update_time(conn)
logging.info(f"初始 todolist 表状态 (last_modified): {last_update}")
break
else:
logging.warning("数据库连接失败,5秒后重试...")
time.sleep(5)
while True:
time.sleep(interval)
try:
# 检查连接是否仍然有效,如果无效则尝试重连
if not conn or not conn.is_connected():
logging.warning("数据库连接丢失,尝试重新连接...")
conn = get_db_conn()
if not conn:
logging.error("重新连接失败,跳过本次检查。")
continue # 跳过这次循环
else:
# 重连成功后,重新获取一次最新状态,避免因断连期间的更新而误判
last_update = get_latest_update_time(conn)
logging.info(f"重新连接成功,当前 todolist 表状态 (last_modified): {last_update}")
current_update = get_latest_update_time(conn)
# 检查 current_update 是否为 None (查询失败或表为空)
if current_update is None and last_update is None:
logging.debug("数据库查询失败或表仍为空,继续监听...")
elif current_update != last_update:
logging.info(f"检测到 todolist 表有更新 (last_modified 从 {last_update} 变为 {current_update}), 开始联动执行...")
run_all_scripts() # 触发脚本执行
last_update = current_update # 更新状态
else:
logging.debug(f"无更新 (last_modified: {current_update}),继续监听...")
except Exception as e:
logging.error(f"监听或执行过程中发生错误: {e}", exc_info=True)
# 发生未知错误时,尝试关闭可能存在的旧连接,并在下次循环重连
if conn and conn.is_connected():
try:
conn.close()
logging.info("因异常关闭数据库连接")
except Exception as close_err:
logging.error(f"关闭连接时出错: {close_err}")
conn = None # 强制下次循环重连
time.sleep(interval) # 避免错误循环过快
if __name__ == "__main__":
logging.info("启动 todolist 监听与脚本执行程序")
monitor_todolist_and_run(interval=5) # 设置监听间隔为5秒
# --- 移除旧的 schedule 相关代码 ---
# def main():
# logging.info("启动定时任务程序")
#
# # 设置每小时运行一次
# schedule.every().hour.at(":43").do(run_all_scripts)
#
# # 立即运行一次
# run_all_scripts()
#
# # 持续运行
# while True:
# try:
# schedule.run_pending()
# time.sleep(60)
# except Exception as e:
# logging.error(f"运行时发生错误: {str(e)}")
# time.sleep(60) # 发生错误时等待一分钟后继续
#
# if __name__ == "__main__":
# main()