Spaces:
Sleeping
Sleeping
File size: 2,546 Bytes
84ed1d1 |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 |
# '''
# Description: 定时运行数据库相关脚本的主程序
# Author: Manda
# Version: 1.0
# Date: 2024-03-30
# '''
import schedule
import time
import subprocess
import logging
from datetime import datetime
import os
import sys
import io
sys.stdout = io.TextIOWrapper(sys.stdout.buffer, encoding='utf-8')
# 配置日志
logging.basicConfig(
level=logging.INFO,
format='%(asctime)s - %(levelname)s - %(message)s',
handlers=[
logging.FileHandler('notify_main.log', encoding='utf-8'),
logging.StreamHandler()
]
)
def run_script(script_name: str):
"""运行指定的Python脚本"""
try:
logging.info(f"开始运行脚本: {script_name}")
result = subprocess.run(['python', script_name],
capture_output=True,
text=True,
encoding='utf-8') # 明确指定使用 UTF-8 编码
if result.returncode == 0:
logging.info(f"脚本 {script_name} 运行成功")
if result.stdout:
logging.info(f"输出: {result.stdout}")
else:
logging.error(f"脚本 {script_name} 运行失败")
if result.stderr:
logging.error(f"错误: {result.stderr}")
except Exception as e:
logging.error(f"运行脚本 {script_name} 时发生错误: {str(e)}")
def run_all_scripts():
"""运行所有脚本"""
logging.info("开始执行所有脚本")
# 获取当前脚本所在的目录
current_dir = os.path.dirname(os.path.abspath(__file__))
scripts = ['db2txt.py', 'usrSpareTime.py', 'compareDb2txt.py']
for script in scripts:
# 使用完整的文件路径
script_path = os.path.join(current_dir, script)
if os.path.exists(script_path):
run_script(script_path)
time.sleep(5)
else:
logging.error(f"脚本文件不存在: {script_path}")
logging.info("所有脚本执行完成")
def main():
logging.info("启动定时任务程序")
# 设置每小时运行一次
schedule.every().hour.at(":43").do(run_all_scripts)
# 立即运行一次
run_all_scripts()
# 持续运行
while True:
try:
schedule.run_pending()
time.sleep(60)
except Exception as e:
logging.error(f"运行时发生错误: {str(e)}")
time.sleep(60) # 发生错误时等待一分钟后继续
if __name__ == "__main__":
main() |